Repository: sentry Updated Branches: refs/heads/master 3dab053e1 -> 049da445f
SENTRY-2324: Allow sentry to fetch configurable notifications from HMS (Arjun Mishra reviewed by Kalyan Kumar Kalvagadda and Lina li) Change-Id: I0272a0f53e5387e21f9cd7c08a64177467edbafe Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/049da445 Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/049da445 Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/049da445 Branch: refs/heads/master Commit: 049da445fd5f1320f7b4cde888d8f3433e20e4ba Parents: 3dab053 Author: amishra <amis...@cloudera.com> Authored: Thu Aug 2 11:30:16 2018 -0500 Committer: amishra <amis...@cloudera.com> Committed: Thu Aug 2 11:30:43 2018 -0500 ---------------------------------------------------------------------- .../apache/sentry/hdfs/ServiceConstants.java | 3 + .../db/service/persistent/HMSFollower.java | 22 ++++++- .../service/thrift/HiveNotificationFetcher.java | 2 +- .../db/service/persistent/TestHMSFollower.java | 69 ++++++++++++++++++++ .../thrift/TestHiveNotificationFetcher.java | 33 ++++++++++ 5 files changed, 126 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/049da445/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java index a9afb15..2d21411 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java +++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java @@ -49,6 +49,9 @@ public class ServiceConstants { public static final String SENTRY_HDFS_INTEGRATION_PATH_PREFIXES = "sentry.hdfs.integration.path.prefixes"; public static final String[] SENTRY_HDFS_INTEGRATION_PATH_PREFIXES_DEFAULT = new String[]{"/user/hive/warehouse"}; + + public static final String SENTRY_HMS_FETCH_SIZE = "sentry.hms.fetch.size"; + public static final int SENTRY_HMS_FETCH_SIZE_DEFAULT = -1; } public static class ClientConfig { http://git-wip-us.apache.org/repos/asf/sentry/blob/049da445/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java index fb826cf..b6dca7a 100644 --- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java +++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java @@ -68,6 +68,11 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber { private final LeaderStatusMonitor leaderMonitor; /** + * Determine how deep should sentry look for newer notifications + * Default value is -1 which means it gets till the max + */ + private int sentryHMSFetchSize; + /** * Current generation of HMS snapshots. HMSFollower is single-threaded, so no need * to protect against concurrent modification. */ @@ -117,6 +122,14 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber { LOGGER.info(FULL_UPDATE_TRIGGER + "subscribing to topic " + PubSub.Topic.HDFS_SYNC_HMS.getName()); PubSub.getInstance().subscribe(PubSub.Topic.HDFS_SYNC_HMS, this); } + + sentryHMSFetchSize = conf.getInt(ServerConfig.SENTRY_HMS_FETCH_SIZE, ServerConfig.SENTRY_HMS_FETCH_SIZE_DEFAULT); + if(sentryHMSFetchSize < 0) { + LOGGER.info("Sentry will fetch from HMS max depth"); + } else { + LOGGER.info("Sentry will fetch from HMS with depth of {}", sentryHMSFetchSize); + } + if(!hdfsSyncEnabled) { try { // Clear all the HMS metadata learned so far and learn it fresh when the feature @@ -225,8 +238,13 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber { notificationId = 0L; } - Collection<NotificationEvent> notifications = - notificationFetcher.fetchNotifications(notificationId); + Collection<NotificationEvent> notifications = null; + + if(sentryHMSFetchSize < 0 ) { + notifications = notificationFetcher.fetchNotifications(notificationId); + } else { + notifications = notificationFetcher.fetchNotifications(notificationId, sentryHMSFetchSize); + } // After getting notifications, check if HMS did some clean-up and notifications // are out-of-sync with Sentry. http://git-wip-us.apache.org/repos/asf/sentry/blob/049da445/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java index 8490d7a..ed34f96 100644 --- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java +++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java @@ -73,7 +73,7 @@ public final class HiveNotificationFetcher implements AutoCloseable { * @return A list of newer notifications unseen by Sentry. * @throws Exception If an error occurs on the HMS communication. */ - List<NotificationEvent> fetchNotifications(long lastEventId, int maxEvents) throws Exception { + public List<NotificationEvent> fetchNotifications(long lastEventId, int maxEvents) throws Exception { NotificationFilter filter = null; /* http://git-wip-us.apache.org/repos/asf/sentry/blob/049da445/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java b/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java index 1b4fa47..0d62941 100644 --- a/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java +++ b/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java @@ -16,6 +16,7 @@ */ package org.apache.sentry.provider.db.service.persistent; +import static org.apache.sentry.hdfs.ServiceConstants.ServerConfig.SENTRY_HMS_FETCH_SIZE; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -1294,4 +1295,72 @@ public class TestHMSFollower { verify(sentryStore, times(1)).renamePrivilege(authorizable, newAuthorizable, NotificationProcessor.getPermUpdatableOnRename(authorizable, newAuthorizable)); } + + /** + * Constructs events and fetch a portion of them. Make sure that appropriate sentry store API's + * are invoke when the event is processed by hms follower. + * + * @throws Exception + */ + @Test + public void testPartialHMSFetch() throws Exception { + final long SENTRY_PROCESSED_EVENT_ID = 1L; + final long HMS_PROCESSED_EVENT_ID = 1L; + + SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class); + // Mock that returns a full snapshot + Map<String, Collection<String>> snapshotObjects = new HashMap<>(); + snapshotObjects.put("db", Sets.newHashSet("/db")); + snapshotObjects.put("db.table", Sets.newHashSet("/db/table")); + PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1); + + when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot); + + //Just fetch 3 notificaitons at a time + int sentryHMSFetchSize = 3; + configuration.setInt(SENTRY_HMS_FETCH_SIZE, sentryHMSFetchSize); + + NotificationEventResponse response = new NotificationEventResponse(); + NotificationEventResponse partialResponse = new NotificationEventResponse(); + + response.addToEvents(new NotificationEvent(1L, 0, "CREATE_DATABASE", "")); + partialResponse.addToEvents(new NotificationEvent(1L, 0, "CREATE_DATABASE", "")); + response.addToEvents(new NotificationEvent(2L, 0, "CREATE_TABLE", "")); + partialResponse.addToEvents(new NotificationEvent(2L, 0, "CREATE_TABLE", "")); + response.addToEvents(new NotificationEvent(3L, 0, "ALTER_TABLE", "")); + partialResponse.addToEvents(new NotificationEvent(3L, 0, "ALTER_TABLE", "")); + response.addToEvents(new NotificationEvent(4L, 0, "ALTER_TABLE", "")); + response.addToEvents(new NotificationEvent(5L, 0, "ALTER_TABLE", "")); + + when(hmsClientMock.getNextNotification(Mockito.eq(0L), Mockito.eq(Integer.MAX_VALUE), + Mockito.anyObject())).thenReturn(response); + when(hmsClientMock.getNextNotification(Mockito.eq(0L), Mockito.eq(sentryHMSFetchSize), + Mockito.anyObject())).thenReturn(partialResponse); + when(hmsClientMock.getCurrentNotificationEventId()).thenReturn(new CurrentNotificationEventId(SENTRY_PROCESSED_EVENT_ID)); + + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, + hmsConnectionMock, hiveInstance); + hmsFollower.setSentryHmsClient(sentryHmsClient); + + // 1st run should not fetch full snapshot but should fetch notifications from 0 + // and persists them + when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); + when(sentryStore.isHmsNotificationEmpty()).thenReturn(false); + hmsFollower.run(); + verify(sentryStore, times(0)).persistFullPathsImage( + fullSnapshot.getPathImage(), fullSnapshot.getId()); + // Making sure that HMS client is invoked to get all the notifications + // starting from event-id 0 + verify(hmsClientMock, times(0)).getNextNotification(Mockito.eq(0L), + Mockito.eq(Integer.MAX_VALUE), Mockito.anyObject()); + verify(hmsClientMock, times(1)).getNextNotification(Mockito.eq(0L), + Mockito.eq(sentryHMSFetchSize), Mockito.anyObject()); + verify(sentryStore, times(1)).persistLastProcessedNotificationID(1L); + verify(sentryStore, times(1)).persistLastProcessedNotificationID(2L); + verify(sentryStore, times(1)).persistLastProcessedNotificationID(3L); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(4L); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(5L); + + reset(sentryStore, hmsClientMock); + } } http://git-wip-us.apache.org/repos/asf/sentry/blob/049da445/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java b/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java index 83a1bec..e3ebb65 100644 --- a/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java +++ b/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java @@ -160,4 +160,37 @@ public class TestHiveNotificationFetcher { assertEquals("ALTER_TABLE", events.get(1).getEventType()); } } + + @Test + public void testPartialFetchesFromHMS() throws Exception { + SentryStore store = Mockito.mock(SentryStore.class); + HiveConnectionFactory hmsConnection = Mockito.mock(HiveConnectionFactory.class); + HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class); + + Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient)); + + try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection)) { + List<NotificationEvent> events; + + Mockito.when(hmsClient.getNextNotification(0, Integer.MAX_VALUE, null)) + .thenReturn(new NotificationEventResponse( + Arrays.<NotificationEvent>asList( + new NotificationEvent(1L, 0, "CREATE_DATABASE", ""), + new NotificationEvent(2L, 0, "CREATE_TABLE", "") + ) + )); + Mockito.when(hmsClient.getNextNotification(0, 1, null)) + .thenReturn(new NotificationEventResponse( + Arrays.<NotificationEvent>asList( + new NotificationEvent(1L, 0, "CREATE_DATABASE", "") + ) + )); + + int sentryHMSFetchSize = 1; + events = fetcher.fetchNotifications(0, sentryHMSFetchSize); + assertEquals(1, events.size()); + assertEquals(1, events.get(0).getEventId()); + assertEquals("CREATE_DATABASE", events.get(0).getEventType()); + } + } }