Repository: sentry
Updated Branches:
  refs/heads/master 3dab053e1 -> 049da445f


SENTRY-2324: Allow sentry to fetch configurable notifications from HMS (Arjun 
Mishra reviewed by  Kalyan Kumar Kalvagadda and  Lina li)

Change-Id: I0272a0f53e5387e21f9cd7c08a64177467edbafe


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/049da445
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/049da445
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/049da445

Branch: refs/heads/master
Commit: 049da445fd5f1320f7b4cde888d8f3433e20e4ba
Parents: 3dab053
Author: amishra <amis...@cloudera.com>
Authored: Thu Aug 2 11:30:16 2018 -0500
Committer: amishra <amis...@cloudera.com>
Committed: Thu Aug 2 11:30:43 2018 -0500

----------------------------------------------------------------------
 .../apache/sentry/hdfs/ServiceConstants.java    |  3 +
 .../db/service/persistent/HMSFollower.java      | 22 ++++++-
 .../service/thrift/HiveNotificationFetcher.java |  2 +-
 .../db/service/persistent/TestHMSFollower.java  | 69 ++++++++++++++++++++
 .../thrift/TestHiveNotificationFetcher.java     | 33 ++++++++++
 5 files changed, 126 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/049da445/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
----------------------------------------------------------------------
diff --git 
a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
 
b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
index a9afb15..2d21411 100644
--- 
a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
+++ 
b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
@@ -49,6 +49,9 @@ public class ServiceConstants {
     public static final String SENTRY_HDFS_INTEGRATION_PATH_PREFIXES = 
"sentry.hdfs.integration.path.prefixes";
     public static final String[] SENTRY_HDFS_INTEGRATION_PATH_PREFIXES_DEFAULT 
=
             new String[]{"/user/hive/warehouse"};
+
+    public static final String SENTRY_HMS_FETCH_SIZE = "sentry.hms.fetch.size";
+    public static final int SENTRY_HMS_FETCH_SIZE_DEFAULT = -1;
   }
 
   public static class ClientConfig {

http://git-wip-us.apache.org/repos/asf/sentry/blob/049da445/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java
----------------------------------------------------------------------
diff --git 
a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java
 
b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java
index fb826cf..b6dca7a 100644
--- 
a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java
+++ 
b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java
@@ -68,6 +68,11 @@ public class HMSFollower implements Runnable, AutoCloseable, 
PubSub.Subscriber {
   private final LeaderStatusMonitor leaderMonitor;
 
   /**
+   * Determine how deep should sentry look for newer notifications
+   * Default value is -1 which means it gets till the max
+   */
+  private int sentryHMSFetchSize;
+  /**
    * Current generation of HMS snapshots. HMSFollower is single-threaded, so 
no need
    * to protect against concurrent modification.
    */
@@ -117,6 +122,14 @@ public class HMSFollower implements Runnable, 
AutoCloseable, PubSub.Subscriber {
       LOGGER.info(FULL_UPDATE_TRIGGER + "subscribing to topic " + 
PubSub.Topic.HDFS_SYNC_HMS.getName());
       PubSub.getInstance().subscribe(PubSub.Topic.HDFS_SYNC_HMS, this);
     }
+
+    sentryHMSFetchSize = conf.getInt(ServerConfig.SENTRY_HMS_FETCH_SIZE, 
ServerConfig.SENTRY_HMS_FETCH_SIZE_DEFAULT);
+    if(sentryHMSFetchSize < 0) {
+      LOGGER.info("Sentry will fetch from HMS max depth");
+    } else {
+      LOGGER.info("Sentry will fetch from HMS with depth of {}", 
sentryHMSFetchSize);
+    }
+
     if(!hdfsSyncEnabled) {
       try {
         // Clear all the HMS metadata learned so far and learn it fresh when 
the feature
@@ -225,8 +238,13 @@ public class HMSFollower implements Runnable, 
AutoCloseable, PubSub.Subscriber {
         notificationId = 0L;
       }
 
-      Collection<NotificationEvent> notifications =
-          notificationFetcher.fetchNotifications(notificationId);
+      Collection<NotificationEvent> notifications = null;
+
+      if(sentryHMSFetchSize < 0 ) {
+        notifications = notificationFetcher.fetchNotifications(notificationId);
+      } else {
+        notifications = notificationFetcher.fetchNotifications(notificationId, 
sentryHMSFetchSize);
+      }
 
       // After getting notifications, check if HMS did some clean-up and 
notifications
       // are out-of-sync with Sentry.

http://git-wip-us.apache.org/repos/asf/sentry/blob/049da445/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java
----------------------------------------------------------------------
diff --git 
a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java
 
b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java
index 8490d7a..ed34f96 100644
--- 
a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java
+++ 
b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java
@@ -73,7 +73,7 @@ public final class HiveNotificationFetcher implements 
AutoCloseable {
    * @return A list of newer notifications unseen by Sentry.
    * @throws Exception If an error occurs on the HMS communication.
    */
-  List<NotificationEvent> fetchNotifications(long lastEventId, int maxEvents) 
throws Exception {
+  public List<NotificationEvent> fetchNotifications(long lastEventId, int 
maxEvents) throws Exception {
     NotificationFilter filter = null;
 
     /*

http://git-wip-us.apache.org/repos/asf/sentry/blob/049da445/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java
----------------------------------------------------------------------
diff --git 
a/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java
 
b/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java
index 1b4fa47..0d62941 100644
--- 
a/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java
+++ 
b/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java
@@ -16,6 +16,7 @@
  */
 package  org.apache.sentry.provider.db.service.persistent;
 
+import static 
org.apache.sentry.hdfs.ServiceConstants.ServerConfig.SENTRY_HMS_FETCH_SIZE;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -1294,4 +1295,72 @@ public class TestHMSFollower {
     verify(sentryStore, times(1)).renamePrivilege(authorizable, 
newAuthorizable,
         NotificationProcessor.getPermUpdatableOnRename(authorizable, 
newAuthorizable));
   }
+
+  /**
+   * Constructs events and fetch a portion of them. Make sure that appropriate 
sentry store API's
+   * are invoke when the event is processed by hms follower.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testPartialHMSFetch() throws Exception {
+    final long SENTRY_PROCESSED_EVENT_ID = 1L;
+    final long HMS_PROCESSED_EVENT_ID = 1L;
+
+    SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class);
+    // Mock that returns a full snapshot
+    Map<String, Collection<String>> snapshotObjects = new HashMap<>();
+    snapshotObjects.put("db", Sets.newHashSet("/db"));
+    snapshotObjects.put("db.table", Sets.newHashSet("/db/table"));
+    PathsImage fullSnapshot = new PathsImage(snapshotObjects, 
HMS_PROCESSED_EVENT_ID, 1);
+
+    when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot);
+
+    //Just fetch 3 notificaitons at a time
+    int sentryHMSFetchSize = 3;
+    configuration.setInt(SENTRY_HMS_FETCH_SIZE, sentryHMSFetchSize);
+
+    NotificationEventResponse response = new NotificationEventResponse();
+    NotificationEventResponse partialResponse = new 
NotificationEventResponse();
+
+    response.addToEvents(new NotificationEvent(1L, 0, "CREATE_DATABASE", ""));
+    partialResponse.addToEvents(new NotificationEvent(1L, 0, 
"CREATE_DATABASE", ""));
+    response.addToEvents(new NotificationEvent(2L, 0, "CREATE_TABLE", ""));
+    partialResponse.addToEvents(new NotificationEvent(2L, 0, "CREATE_TABLE", 
""));
+    response.addToEvents(new NotificationEvent(3L, 0, "ALTER_TABLE", ""));
+    partialResponse.addToEvents(new NotificationEvent(3L, 0, "ALTER_TABLE", 
""));
+    response.addToEvents(new NotificationEvent(4L, 0, "ALTER_TABLE", ""));
+    response.addToEvents(new NotificationEvent(5L, 0, "ALTER_TABLE", ""));
+
+    when(hmsClientMock.getNextNotification(Mockito.eq(0L), 
Mockito.eq(Integer.MAX_VALUE),
+        Mockito.anyObject())).thenReturn(response);
+    when(hmsClientMock.getNextNotification(Mockito.eq(0L), 
Mockito.eq(sentryHMSFetchSize),
+        Mockito.anyObject())).thenReturn(partialResponse);
+    when(hmsClientMock.getCurrentNotificationEventId()).thenReturn(new 
CurrentNotificationEventId(SENTRY_PROCESSED_EVENT_ID));
+
+    HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
+        hmsConnectionMock, hiveInstance);
+    hmsFollower.setSentryHmsClient(sentryHmsClient);
+
+    // 1st run should not fetch full snapshot but should fetch notifications 
from 0
+    // and persists them
+    
when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID);
+    when(sentryStore.isHmsNotificationEmpty()).thenReturn(false);
+    hmsFollower.run();
+    verify(sentryStore, times(0)).persistFullPathsImage(
+        fullSnapshot.getPathImage(), fullSnapshot.getId());
+    // Making sure that HMS client is invoked to get all the notifications
+    // starting from event-id 0
+    verify(hmsClientMock, times(0)).getNextNotification(Mockito.eq(0L),
+        Mockito.eq(Integer.MAX_VALUE), Mockito.anyObject());
+    verify(hmsClientMock, times(1)).getNextNotification(Mockito.eq(0L),
+        Mockito.eq(sentryHMSFetchSize), Mockito.anyObject());
+    verify(sentryStore, times(1)).persistLastProcessedNotificationID(1L);
+    verify(sentryStore, times(1)).persistLastProcessedNotificationID(2L);
+    verify(sentryStore, times(1)).persistLastProcessedNotificationID(3L);
+    verify(sentryStore, times(0)).persistLastProcessedNotificationID(4L);
+    verify(sentryStore, times(0)).persistLastProcessedNotificationID(5L);
+
+    reset(sentryStore, hmsClientMock);
+  }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/049da445/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java
----------------------------------------------------------------------
diff --git 
a/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java
 
b/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java
index 83a1bec..e3ebb65 100644
--- 
a/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java
+++ 
b/sentry-service/sentry-service-server/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java
@@ -160,4 +160,37 @@ public class TestHiveNotificationFetcher {
       assertEquals("ALTER_TABLE", events.get(1).getEventType());
     }
   }
+
+  @Test
+  public void testPartialFetchesFromHMS() throws Exception {
+    SentryStore store = Mockito.mock(SentryStore.class);
+    HiveConnectionFactory hmsConnection = 
Mockito.mock(HiveConnectionFactory.class);
+    HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class);
+
+    Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient));
+
+    try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, 
hmsConnection)) {
+      List<NotificationEvent> events;
+
+      Mockito.when(hmsClient.getNextNotification(0, Integer.MAX_VALUE, null))
+          .thenReturn(new NotificationEventResponse(
+              Arrays.<NotificationEvent>asList(
+                  new NotificationEvent(1L, 0, "CREATE_DATABASE", ""),
+                  new NotificationEvent(2L, 0, "CREATE_TABLE", "")
+              )
+          ));
+      Mockito.when(hmsClient.getNextNotification(0, 1, null))
+          .thenReturn(new NotificationEventResponse(
+              Arrays.<NotificationEvent>asList(
+                  new NotificationEvent(1L, 0, "CREATE_DATABASE", "")
+              )
+          ));
+
+      int sentryHMSFetchSize = 1;
+      events = fetcher.fetchNotifications(0, sentryHMSFetchSize);
+      assertEquals(1, events.size());
+      assertEquals(1, events.get(0).getEventId());
+      assertEquals("CREATE_DATABASE", events.get(0).getEventType());
+    }
+  }
 }

Reply via email to