Repository: sentry Updated Branches: refs/heads/sentry-ha-redesign acf5debe3 -> 9e6d970ce
SENTRY-1794: HMSFollower not persisting last processed notifications when partition is altered (Kalyan Kalvagadda, reviewed by Alex Kolbasov) Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/9e6d970c Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/9e6d970c Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/9e6d970c Branch: refs/heads/sentry-ha-redesign Commit: 9e6d970ce1ce5d5c16b24252f609e86d04543876 Parents: acf5deb Author: Alexander Kolbasov <[email protected]> Authored: Wed Jun 14 00:34:55 2017 -0700 Committer: Alexander Kolbasov <[email protected]> Committed: Wed Jun 14 00:34:55 2017 -0700 ---------------------------------------------------------------------- .../db/service/persistent/SentryStore.java | 8 +- .../sentry/service/thrift/HMSFollower.java | 15 +- .../service/thrift/NotificationProcessor.java | 6 + .../sentry/service/thrift/TestHMSFollower.java | 236 ++++++++++++++++++- 4 files changed, 250 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/9e6d970c/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java index 187b38b..8b19c88 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java @@ -3506,6 +3506,7 @@ public class SentryStore { * Set the notification ID of last processed HMS notification. */ public void persistLastProcessedNotificationID(final Long notificationId) throws Exception { + LOGGER.debug("Persisting Last Processed Notification ID {}", notificationId); tm.executeTransaction( new TransactionBlock<Object>() { public Object execute(PersistenceManager pm) throws Exception { @@ -3552,12 +3553,15 @@ public class SentryStore { * found then return 0. */ public Long getLastProcessedNotificationID() throws Exception { - return tm.executeTransaction( + long notificationId = tm.executeTransaction( new TransactionBlock<Long>() { public Long execute(PersistenceManager pm) throws Exception { - return getLastProcessedNotificationIDCore(pm); + long notificationId = getLastProcessedNotificationIDCore(pm); + return notificationId; } }); + LOGGER.debug("Retrieving Last Processed Notification ID {}", notificationId); + return notificationId; } /** http://git-wip-us.apache.org/repos/asf/sentry/blob/9e6d970c/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java index 78dc0ac..1f7eb18 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java @@ -315,6 +315,8 @@ public class HMSFollower implements Runnable, AutoCloseable { // NotificationEventResponse causing TProtocolException. // Workaround: Only processes the notification events newer than the last updated one. CurrentNotificationEventId eventId = client.getCurrentNotificationEventId(); + LOGGER.debug("Last Notification in HMS {} lastProcessedNotificationID is {}", + eventId.getEventId(), lastProcessedNotificationID); if (eventId.getEventId() > lastProcessedNotificationID) { NotificationEventResponse response = client.getNextNotification(lastProcessedNotificationID, Integer.MAX_VALUE, null); @@ -414,9 +416,12 @@ public class HMSFollower implements Runnable, AutoCloseable { List<String> locations; NotificationProcessor notificationProcessor = new NotificationProcessor(sentryStore, LOGGER); try { + LOGGER.debug("Processing notification with id {} and type {}", event.getEventId(), + event.getEventType()); switch (HCatEventMessage.EventType.valueOf(event.getEventType())) { case CREATE_DATABASE: - SentryJSONCreateDatabaseMessage message = deserializer.getCreateDatabaseMessage(event.getMessage()); + SentryJSONCreateDatabaseMessage message = + deserializer.getCreateDatabaseMessage(event.getMessage()); dbName = message.getDB(); location = message.getLocation(); if ((dbName == null) || (location == null)) { @@ -510,9 +515,9 @@ public class HMSFollower implements Runnable, AutoCloseable { StringUtils.defaultIfBlank(newTableName, "null"), StringUtils.defaultIfBlank(newLocation, "null"))); break; - } else if ((oldDbName == newDbName) && - (oldTableName == newTableName) && - (oldLocation == newLocation)) { + } else if ((oldDbName.equalsIgnoreCase(newDbName)) && + (oldTableName.equalsIgnoreCase(newTableName)) && + (oldLocation.equalsIgnoreCase(newLocation))) { isNotificationProcessingSkipped = true; LOGGER.info(String.format("Alter table notification ignored as neither name nor " + "location has changed: oldAuthzObj = %s, oldLocation = %s, newAuthzObj = %s, " + @@ -593,7 +598,7 @@ public class HMSFollower implements Runnable, AutoCloseable { StringUtils.defaultIfBlank(oldLocation, "null"), StringUtils.defaultIfBlank(newLocation, "null"))); break; - } else if (oldLocation == newLocation) { + } else if (oldLocation.equalsIgnoreCase(newLocation)) { isNotificationProcessingSkipped = true; LOGGER.info(String.format("Alter partition notification ignored as" + "location has not changed: AuthzObj = %s, Location = %s", dbName + "." + http://git-wip-us.apache.org/repos/asf/sentry/blob/9e6d970c/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java index de8e2f7..6762de7 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java @@ -20,6 +20,7 @@ package org.apache.sentry.service.thrift; import com.google.common.collect.Lists; import org.apache.sentry.hdfs.PathsUpdate; import org.apache.sentry.hdfs.SentryMalformedPathException; +import org.apache.sentry.core.common.exception.SentryInvalidHMSEventException; import org.apache.sentry.provider.db.service.persistent.SentryStore; import org.slf4j.Logger; @@ -358,6 +359,11 @@ class NotificationProcessor { Collections.singleton(newPathTree), update); } + } else { + LOGGER.error("Update Notification for Auhorizable object {}, with no change, skipping", + oldAuthzObj); + throw new SentryInvalidHMSEventException("Update Notification for Authorizable object" + + "with no change"); } } http://git-wip-us.apache.org/repos/asf/sentry/blob/9e6d970c/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java index 74a5afb..66ad2a1 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java @@ -19,10 +19,16 @@ package org.apache.sentry.service.thrift; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.api.*; import org.apache.hive.hcatalog.messaging.HCatEventMessage; +import org.apache.hive.hcatalog.messaging.HCatEventMessage.EventType; import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageFactory; +import org.apache.sentry.hdfs.Updateable; import org.apache.sentry.provider.db.service.persistent.SentryStore; import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable; + +import java.util.Arrays; + import org.junit.Test; +import org.junit.Ignore; import org.mockito.Mockito; import java.util.ArrayList; @@ -30,6 +36,7 @@ import java.util.List; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.reset; public class TestHMSFollower { SentryJSONMessageFactory messageFactory = new SentryJSONMessageFactory(); @@ -42,7 +49,7 @@ public class TestHMSFollower { // Create notification events NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.CREATE_DATABASE.toString(), - messageFactory.buildCreateDatabaseMessage(new Database(dbName, null, "hdfs:///db1", null)).toString()); + messageFactory.buildCreateDatabaseMessage(new Database(dbName, null, "hdfs:///db1", null)).toString()); List<NotificationEvent> events = new ArrayList<>(); events.add(notificationEvent); @@ -63,7 +70,7 @@ public class TestHMSFollower { // Create notification events NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.DROP_DATABASE.toString(), - messageFactory.buildDropDatabaseMessage(new Database(dbName, null, "hdfs:///db1", null)).toString()); + messageFactory.buildDropDatabaseMessage(new Database(dbName, null, "hdfs:///db1", null)).toString()); List<NotificationEvent> events = new ArrayList<>(); events.add(notificationEvent); @@ -75,8 +82,9 @@ public class TestHMSFollower { authorizable.setServer(hiveInstance); authorizable.setDb("db1"); - verify(sentryStore, times(1)).dropPrivilege(authorizable, HMSFollower.onDropSentryPrivilege(authorizable)) ; + verify(sentryStore, times(1)).dropPrivilege(authorizable, HMSFollower.onDropSentryPrivilege(authorizable)); } + @Test public void testCreateTable() throws Exception { String dbName = "db1"; @@ -86,7 +94,7 @@ public class TestHMSFollower { StorageDescriptor sd = new StorageDescriptor(); sd.setLocation("hdfs:///db1.db/table1"); NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.CREATE_TABLE.toString(), - messageFactory.buildCreateTableMessage(new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString()); + messageFactory.buildCreateTableMessage(new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString()); List<NotificationEvent> events = new ArrayList<>(); events.add(notificationEvent); @@ -101,6 +109,7 @@ public class TestHMSFollower { verify(sentryStore, times(0)).dropPrivilege(authorizable, HMSFollower.onDropSentryPrivilege(authorizable)); } + @Test public void testDropTable() throws Exception { String dbName = "db1"; @@ -110,7 +119,7 @@ public class TestHMSFollower { StorageDescriptor sd = new StorageDescriptor(); sd.setLocation("hdfs:///db1.db/table1"); NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.DROP_TABLE.toString(), - messageFactory.buildDropTableMessage(new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString()); + messageFactory.buildDropTableMessage(new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString()); List<NotificationEvent> events = new ArrayList<>(); events.add(notificationEvent); @@ -125,6 +134,7 @@ public class TestHMSFollower { verify(sentryStore, times(1)).dropPrivilege(authorizable, HMSFollower.onDropSentryPrivilege(authorizable)); } + @Test public void testRenameTable() throws Exception { String dbName = "db1"; @@ -137,9 +147,9 @@ public class TestHMSFollower { StorageDescriptor sd = new StorageDescriptor(); sd.setLocation("hdfs:///db1.db/table1"); NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.ALTER_TABLE.toString(), - messageFactory.buildAlterTableMessage( - new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null), - new Table(newTableName, newDbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString()); + messageFactory.buildAlterTableMessage( + new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null), + new Table(newTableName, newDbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString()); notificationEvent.setDbName(newDbName); notificationEvent.setTableName(newTableName); List<NotificationEvent> events = new ArrayList<>(); @@ -161,4 +171,214 @@ public class TestHMSFollower { verify(sentryStore, times(1)).renamePrivilege(authorizable, newAuthorizable, HMSFollower.onRenameSentryPrivilege(authorizable, newAuthorizable)); } + + + @Ignore + @Test + public void testAlterPartitionWithInvalidEvent() throws Exception { + String dbName = "db1"; + String tableName1 = "table1"; + String tableName2 = "table2"; + long inputEventId = 1; + List<NotificationEvent> events = new ArrayList<>(); + NotificationEvent notificationEvent = null; + List<FieldSchema> partCols; + StorageDescriptor sd = null; + Mockito.doNothing().when(sentryStore).persistLastProcessedNotificationID(Mockito.anyLong()); + Mockito.doNothing().when(sentryStore).addAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); + + Configuration configuration = new Configuration(); + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance); + + // Create a table + sd = new StorageDescriptor(); + sd.setLocation("hdfs://db1.db/table1"); + partCols = new ArrayList<FieldSchema>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName1, dbName, null, 0, 0, 0, sd, partCols, null, null, null, null); + notificationEvent = new NotificationEvent(inputEventId, 0, + HCatEventMessage.EventType.CREATE_TABLE.toString(), + messageFactory.buildCreateTableMessage(table).toString()); + notificationEvent.setDbName(dbName); + notificationEvent.setTableName(tableName1); + events.add(notificationEvent); + inputEventId += 1; + // Process the notification + hmsFollower.processNotificationEvents(events); + // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification + // and persistLastProcessedNotificationID was not invoked. + verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); + reset(sentryStore); + events.clear(); + + // Create a partition + List<Partition> partitions = new ArrayList<>(); + StorageDescriptor invalidSd = new StorageDescriptor(); + invalidSd.setLocation(null); + Partition partition = new Partition(Arrays.asList("today"), dbName, tableName1, + 0, 0, sd, null); + partitions.add(partition); + notificationEvent = new NotificationEvent(inputEventId, 0, EventType.ADD_PARTITION.toString(), + messageFactory.buildAddPartitionMessage(table, partitions).toString()); + notificationEvent.setDbName(dbName); + notificationEvent.setTableName(tableName1); + events.add(notificationEvent); + inputEventId += 1; + //Process the notification + hmsFollower.processNotificationEvents(events); + // Make sure that addAuthzPathsMapping was invoked once to handle ADD_PARTITION notification + // and persistLastProcessedNotificationID was not invoked. + verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); + reset(sentryStore); + events.clear(); + + // Create a alter notification with out actually changing anything. + // This is an invalid event and should be processed by sentry store. + // Event Id should be explicitly persisted using persistLastProcessedNotificationID + notificationEvent = new NotificationEvent(inputEventId, 0, EventType.ALTER_PARTITION.toString(), + messageFactory.buildAlterPartitionMessage(partition, partition).toString()); + notificationEvent.setDbName(dbName); + notificationEvent.setTableName(tableName1); + events.add(notificationEvent); + inputEventId += 1; + // Process the notification + hmsFollower.processNotificationEvents(events); + // Make sure that persistLastProcessedNotificationID is invoked explicitly. + verify(sentryStore, times(1)).persistLastProcessedNotificationID(inputEventId - 1); + reset(sentryStore); + events.clear(); + + // Create a alter notification with some actual change. + sd = new StorageDescriptor(); + sd.setLocation("hdfs://user/hive/wareshouse/db1.db/table1"); + Partition updatedPartition = new Partition(partition); + updatedPartition.setSd(sd); + notificationEvent = new NotificationEvent(inputEventId, 0, EventType.ALTER_PARTITION.toString(), + messageFactory.buildAlterPartitionMessage(partition, updatedPartition).toString()); + notificationEvent.setDbName(dbName); + notificationEvent.setTableName(tableName1); + events.add(notificationEvent); + inputEventId += 1; + // Process the notification + hmsFollower.processNotificationEvents(events); + // Make sure that updateAuthzPathsMapping was invoked once to handle ALTER_PARTITION + // notification and persistLastProcessedNotificationID was not invoked. + verify(sentryStore, times(1)).updateAuthzPathsMapping(Mockito.anyString(), + Mockito.anyString(), Mockito.anyString(), Mockito.any(Updateable.Update.class)); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(inputEventId - 1); + reset(sentryStore); + events.clear(); + + // Create a table + sd = new StorageDescriptor(); + sd.setLocation("hdfs://db1.db/table2"); + partCols = new ArrayList<FieldSchema>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table1 = new Table(tableName2, dbName, null, 0, 0, 0, sd, partCols, null, null, null, null); + notificationEvent = new NotificationEvent(inputEventId, 0, + HCatEventMessage.EventType.CREATE_TABLE.toString(), + messageFactory.buildCreateTableMessage(table1).toString()); + notificationEvent.setDbName(dbName); + notificationEvent.setTableName(tableName2); + events.add(notificationEvent); + // Process the notification + hmsFollower.processNotificationEvents(events); + // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification + // and persistLastProcessedNotificationID was not invoked. + verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); + } + + @Test + public void testAlterTableWithInvalidEvent() throws Exception { + String dbName = "db1"; + String tableName1 = "table1"; + String tableName2 = "table2"; + long inputEventId = 1; + List<NotificationEvent> events = new ArrayList<>(); + NotificationEvent notificationEvent = null; + List<FieldSchema> partCols; + StorageDescriptor sd = null; + Mockito.doNothing().when(sentryStore).persistLastProcessedNotificationID(Mockito.anyLong()); + Mockito.doNothing().when(sentryStore).addAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); + + Configuration configuration = new Configuration(); + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance); + + // Create a table + sd = new StorageDescriptor(); + sd.setLocation("hdfs://db1.db/table1"); + partCols = new ArrayList<FieldSchema>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName1, dbName, null, 0, 0, 0, sd, partCols, null, null, null, null); + notificationEvent = new NotificationEvent(inputEventId, 0, + HCatEventMessage.EventType.CREATE_TABLE.toString(), + messageFactory.buildCreateTableMessage(table).toString()); + notificationEvent.setDbName(dbName); + notificationEvent.setTableName(tableName1); + events.add(notificationEvent); + inputEventId += 1; + // Process the notification + hmsFollower.processNotificationEvents(events); + // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification + // and persistLastProcessedNotificationID was not invoked. + verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); + reset(sentryStore); + events.clear(); + + + // Create alter table notification with actuall changing anything. + // This notification should not be processed by sentry server + // Notification should be persisted explicitly + notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.ALTER_TABLE.toString(), + messageFactory.buildAlterTableMessage( + new Table(tableName1, dbName, null, 0, 0, 0, sd, null, null, null, null, null), + new Table(tableName1, dbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString()); + notificationEvent.setDbName(dbName); + notificationEvent.setTableName(tableName1); + events = new ArrayList<>(); + events.add(notificationEvent); + inputEventId += 1; + // Process the notification + hmsFollower.processNotificationEvents(events); + // Make sure that renameAuthzObj and deleteAuthzPathsMapping were not invoked + // to handle CREATE_TABLE notification + // and persistLastProcessedNotificationID is explicitly invoked + verify(sentryStore, times(0)).renameAuthzObj(Mockito.anyString(), Mockito.anyString(), + Mockito.any(Updateable.Update.class)); + verify(sentryStore, times(0)).deleteAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); + verify(sentryStore, times(1)).persistLastProcessedNotificationID(Mockito.anyLong()); + reset(sentryStore); + events.clear(); + + // Create a table + sd = new StorageDescriptor(); + sd.setLocation("hdfs://db1.db/table2"); + partCols = new ArrayList<FieldSchema>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table1 = new Table(tableName2, dbName, null, 0, 0, 0, sd, partCols, null, null, null, null); + notificationEvent = new NotificationEvent(inputEventId, 0, + HCatEventMessage.EventType.CREATE_TABLE.toString(), + messageFactory.buildCreateTableMessage(table1).toString()); + notificationEvent.setDbName(dbName); + notificationEvent.setTableName(tableName2); + events.add(notificationEvent); + // Process the notification + hmsFollower.processNotificationEvents(events); + // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification + // and persistLastProcessedNotificationID was not invoked. + verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); + } }
