Repository: sentry
Updated Branches:
  refs/heads/master 24ea4a412 -> bfb07b4a5


SENTRY-1888: Sentry might not fetch all HMS duplicated events IDs when 
requested (Sergio Pena, reviewed by Alexander Kolbasov)


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/bfb07b4a
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/bfb07b4a
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/bfb07b4a

Branch: refs/heads/master
Commit: bfb07b4a54ac9a69b1fbea1a44751008e70bd9d5
Parents: 24ea4a4
Author: Sergio Pena <[email protected]>
Authored: Fri Sep 1 09:54:34 2017 -0500
Committer: Sergio Pena <[email protected]>
Committed: Fri Sep 1 09:54:34 2017 -0500

----------------------------------------------------------------------
 .../apache/sentry/hdfs/UniquePathsUpdate.java   |   2 +-
 .../db/service/persistent/SentryStore.java      |  23 +++
 .../sentry/service/thrift/HMSFollower.java      |  50 +++--
 .../service/thrift/HiveNotificationFetcher.java | 198 +++++++++++++++++++
 .../sentry/service/thrift/SentryHMSClient.java  |  59 ------
 .../sentry/service/thrift/TestHMSFollower.java  |  58 ++++--
 .../thrift/TestHiveNotificationFetcher.java     | 163 +++++++++++++++
 .../service/thrift/TestSentryHMSClient.java     | 125 ------------
 8 files changed, 460 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/bfb07b4a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UniquePathsUpdate.java
----------------------------------------------------------------------
diff --git 
a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UniquePathsUpdate.java
 
b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UniquePathsUpdate.java
index 7dae2f5..38b4e2a 100644
--- 
a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UniquePathsUpdate.java
+++ 
b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UniquePathsUpdate.java
@@ -47,7 +47,7 @@ public class UniquePathsUpdate extends PathsUpdate {
     return eventHash;
   }
 
-  private String sha1(NotificationEvent event) {
+  public static String sha1(NotificationEvent event) {
     StringBuilder sb = new StringBuilder();
 
     sb.append(event.getEventId());

http://git-wip-us.apache.org/repos/asf/sentry/blob/bfb07b4a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
index 593b92f..04f6b43 100644
--- 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
+++ 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
@@ -4120,4 +4120,27 @@ public class SentryStore {
     tbs.add(transactionBlock);
     tm.executeTransactionBlocksWithRetry(tbs);
   }
+
+  /**
+   * Checks if a notification was already processed by searching for the hash 
value
+   * on the MSentryPathChange table.
+   *
+   * @param hash A SHA-1 hex hash that represents a unique notification
+   * @return True if the notification was already processed; False otherwise
+   */
+  public boolean isNotificationProcessed(final String hash) throws Exception {
+    return tm.executeTransactionWithRetry(new TransactionBlock<Boolean>() {
+      @Override
+      public Boolean execute(PersistenceManager pm) throws Exception {
+        pm.setDetachAllOnCommit(false);
+        Query query = pm.newQuery(MSentryPathChange.class);
+        query.setFilter("this.notificationHash == hash");
+        query.setUnique(true);
+        query.declareParameters("java.lang.String hash");
+        MSentryPathChange changes = (MSentryPathChange) query.execute(hash);
+
+        return changes != null;
+      }
+    });
+  }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/bfb07b4a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
index 35da6fc..d4feb38 100644
--- 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
+++ 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
@@ -48,6 +48,7 @@ public class HMSFollower implements Runnable, AutoCloseable {
   private final Configuration authzConf;
   private final SentryStore sentryStore;
   private final NotificationProcessor notificationProcessor;
+  private final HiveNotificationFetcher notificationFetcher;
   private final boolean hdfsSyncEnabled;
 
   private final LeaderStatusMonitor leaderMonitor;
@@ -60,7 +61,7 @@ public class HMSFollower implements Runnable, AutoCloseable {
    * @param leaderMonitor singleton instance of LeaderStatusMonitor
    */
   HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor 
leaderMonitor,
-              HiveSimpleConnectionFactory hiveConnectionFactory) {
+              HiveConnectionFactory hiveConnectionFactory) {
     this(conf, store, leaderMonitor, hiveConnectionFactory, null);
   }
 
@@ -74,7 +75,7 @@ public class HMSFollower implements Runnable, AutoCloseable {
    */
   @VisibleForTesting
   public HMSFollower(Configuration conf, SentryStore store, 
LeaderStatusMonitor leaderMonitor,
-              HiveSimpleConnectionFactory hiveConnectionFactory, String 
authServerName) {
+              HiveConnectionFactory hiveConnectionFactory, String 
authServerName) {
     LOGGER.info("HMSFollower is being initialized");
     authzConf = conf;
     this.leaderMonitor = leaderMonitor;
@@ -88,6 +89,7 @@ public class HMSFollower implements Runnable, AutoCloseable {
     notificationProcessor = new NotificationProcessor(sentryStore, 
authServerName, authzConf);
     client = new SentryHMSClient(authzConf, hiveConnectionFactory);
     hdfsSyncEnabled = SentryServiceUtil.isHDFSSyncEnabledNoCache(authzConf); 
// no cache to test different settings for hdfs sync
+    notificationFetcher = new HiveNotificationFetcher(sentryStore, 
hiveConnectionFactory);
   }
 
   @VisibleForTesting
@@ -110,6 +112,8 @@ public class HMSFollower implements Runnable, AutoCloseable 
{
         LOGGER.error("Failed to close the Sentry Hms Client", failure);
       }
     }
+
+    notificationFetcher.close();
   }
 
   @Override
@@ -169,7 +173,8 @@ public class HMSFollower implements Runnable, AutoCloseable 
{
         return;
       }
 
-      Collection<NotificationEvent> notifications = 
client.getNotifications(notificationId);
+      Collection<NotificationEvent> notifications =
+          notificationFetcher.fetchNotifications(notificationId);
 
       // After getting notifications, it checks if the HMS did some clean-up 
and notifications
       // are out-of-sync with Sentry.
@@ -214,7 +219,7 @@ public class HMSFollower implements Runnable, AutoCloseable 
{
       return true;
     }
 
-    long currentHmsNotificationId = client.getCurrentNotificationId();
+    long currentHmsNotificationId = 
notificationFetcher.getCurrentNotificationId();
     if (currentHmsNotificationId < latestSentryNotificationId) {
       LOGGER.info("The latest notification ID on HMS is less than the latest 
notification ID "
           + "processed by Sentry. Need to request a full HMS snapshot.");
@@ -240,30 +245,33 @@ public class HMSFollower implements Runnable, 
AutoCloseable {
       return false;
     }
 
+    /*
+     * If the sequence of notifications has a gap, then an out-of-sync might
+     * have happened due to the following issue:
+     *
+     * - HDFS sync was disabled or Sentry was shutdown for a time period 
longer than
+     * the HMS notification clean-up thread causing old notifications to be 
deleted.
+     *
+     * HMS notifications may contain both gaps in the sequence and duplicates
+     * (the same ID repeated more then once for different events).
+     *
+     * To accept duplicates (see NotificationFetcher for more info), then a 
gap is found
+     * if the 1st notification received is higher than the current ID 
processed + 1.
+     * i.e.
+     *   1st ID = 3, latest ID = 3 (duplicate found but no gap detected)
+     *   1st ID = 4, latest ID = 3 (consecutive ID found but no gap detected)
+     *   1st ID = 5, latest ID = 3 (a gap is detected)
+     */
+
     List<NotificationEvent> eventList = (List<NotificationEvent>) events;
     long firstNotificationId = eventList.get(0).getEventId();
-    long lastNotificationId = eventList.get(eventList.size() - 1).getEventId();
-
-    //
-    // If the next expected notification is not available, then an out-of-sync 
might
-    // have happened due to the following issue:
-    //
-    // - HDFS sync was disabled or Sentry was shutdown for a time period 
longer than
-    // the HMS notification clean-up thread causing old notifications to be 
deleted.
-    //
-    if ((latestProcessedId + 1) != firstNotificationId) {
+
+    if (firstNotificationId > (latestProcessedId + 1)) {
       LOGGER.info("Current HMS notifications are out-of-sync with latest 
Sentry processed"
           + "notifications. Need to request a full HMS snapshot.");
       return true;
     }
 
-    long expectedSize = lastNotificationId - latestProcessedId;
-    if (expectedSize < eventList.size()) {
-      LOGGER.info("The HMS notifications fetched has some gaps in the # of 
events received. These"
-          + "should not cause an out-of-sync issue. (expected = {}, fetched = 
{})",
-          expectedSize, eventList.size());
-    }
-
     return false;
   }
 

http://git-wip-us.apache.org/repos/asf/sentry/blob/bfb07b4a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java
 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java
new file mode 100644
index 0000000..4d32992
--- /dev/null
+++ 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java
@@ -0,0 +1,198 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  <p>
+  http://www.apache.org/licenses/LICENSE-2.0
+  <p>
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.sentry.hdfs.UniquePathsUpdate;
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class used to fetch Hive MetaStore notifications.
+ */
+public final class HiveNotificationFetcher implements AutoCloseable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HiveNotificationFetcher.class);
+
+  private final SentryStore sentryStore;
+  private final HiveConnectionFactory hmsConnectionFactory;
+  private HiveMetaStoreClient hmsClient;
+
+  /* The following cache and last filtered ID help us to avoid making less 
calls to the DB */
+  private long lastIdFiltered = 0;
+  private Set<String> cache = new HashSet<>();
+
+  HiveNotificationFetcher(SentryStore sentryStore, HiveConnectionFactory 
hmsConnectionFactory) {
+    this.sentryStore = sentryStore;
+    this.hmsConnectionFactory = hmsConnectionFactory;
+  }
+
+  /**
+   * Fetch new HMS notifications appeared since a specified event ID. The 
returned list may
+   * include notifications with the same specified ID if they were not seen by 
Sentry.
+   *
+   * @param lastEventId The event ID to use to request notifications.
+   * @return A list of newer notifications unseen by Sentry.
+   * @throws Exception If an error occurs on the HMS communication.
+   */
+  List<NotificationEvent> fetchNotifications(long lastEventId) throws 
Exception {
+    return fetchNotifications(lastEventId, Integer.MAX_VALUE);
+  }
+
+  /**
+   * Fetch new HMS notifications appeared since a specified event ID. The 
returned list may
+   * include notifications with the same specified ID if they were not seen by 
Sentry.
+   *
+   * @param lastEventId The event ID to use to request notifications.
+   * @param maxEvents The maximum number of events to fetch.
+   * @return A list of newer notifications unseen by Sentry.
+   * @throws Exception If an error occurs on the HMS communication.
+   */
+  List<NotificationEvent> fetchNotifications(long lastEventId, int maxEvents) 
throws Exception {
+    NotificationFilter filter = null;
+
+    /*
+     * HMS may bring duplicated events that were committed later than the 
previous request. To bring
+     * those newer duplicated events, we request new notifications from the 
last seen ID - 1.
+     *
+     * A current problem is that we could miss duplicates committed much more 
later, but because
+     * HMS does not guarantee the order of those, then it is safer to avoid 
processing them.
+     *
+     * TODO: We can avoid doing this once HIVE-16886 is fixed.
+     */
+    if (lastEventId > 0) {
+      filter = createNotificationFilterFor(lastEventId);
+      lastEventId--;
+    }
+
+    LOGGER.debug("Requesting HMS notifications since ID = {}", lastEventId);
+    NotificationEventResponse response =
+        getHmsClient().getNextNotification(lastEventId, maxEvents, filter);
+
+    if (response != null && response.isSetEvents()) {
+      LOGGER.debug("Fetched {} new HMS notification(s)", 
response.getEventsSize());
+      return response.getEvents();
+    }
+
+    return Collections.emptyList();
+  }
+
+  /**
+   * Returns a HMS notification filter for a specific notification ID. HMS 
notifications may
+   * have duplicated IDs, so the filter uses a SHA-1 hash to check for a 
unique notification.
+   *
+   * @param id the notification ID to filter
+   * @return the HMS notification filter
+   */
+  private NotificationFilter createNotificationFilterFor(final long id) {
+    /*
+     * A SHA-1 hex value that keeps unique notifications processed is 
persisted on the Sentry DB.
+     * To keep unnecessary calls to the DB, we use a cache that keeps seen 
hashes of the
+     * specified ID. If a new filter ID is used, then we clean up the cache.
+     */
+
+    if (lastIdFiltered != id) {
+      lastIdFiltered = id;
+      cache.clear();
+    }
+
+    return new NotificationFilter() {
+      @Override
+      public boolean accept(NotificationEvent notificationEvent) {
+        if (notificationEvent.getEventId() == id) {
+          String hash = UniquePathsUpdate.sha1(notificationEvent);
+
+          try {
+            if (cache.contains(hash) || 
sentryStore.isNotificationProcessed(hash)) {
+              cache.add(hash);
+
+              LOGGER.debug("Ignoring HMS notification already processed: ID = 
{}", id);
+              return false;
+            }
+          } catch (Exception e) {
+            LOGGER.error("An error occurred while checking if notification {} 
is already "
+                + "processed: {}", id, e.getMessage());
+
+            // We cannot throw an exception on this filter, so we return false 
assuming this
+            // notification is already processed
+            return false;
+          }
+        }
+
+        return true;
+      }
+    };
+  }
+
+  /**
+   * Gets the HMS client connection object.
+   * If will create a new connection if no connection object exists.
+   *
+   * @return The HMS client used to communication with the Hive MetaStore.
+   * @throws Exception If it cannot connect to the HMS service.
+   */
+  private HiveMetaStoreClient getHmsClient() throws Exception {
+    if (hmsClient == null) {
+      try {
+        hmsClient = hmsConnectionFactory.connect().getClient();
+      } catch (Exception e) {
+        LOGGER.error("Fail to connect to the HMS service: {}", e.getMessage());
+        throw e;
+      }
+    }
+
+    return hmsClient;
+  }
+
+  /**
+   * @return the latest notification Id logged by the HMS
+   * @throws Exception when an error occurs when talking to the HMS client
+   */
+  long getCurrentNotificationId() throws Exception {
+    CurrentNotificationEventId eventId = 
getHmsClient().getCurrentNotificationEventId();
+    if (eventId != null && eventId.isSetEventId()) {
+      return eventId.getEventId();
+    }
+
+    return SentryStore.EMPTY_NOTIFICATION_ID;
+  }
+
+  /* AutoCloseable implementations */
+
+  @Override
+  public void close() {
+    try {
+      if (hmsClient != null) {
+        hmsClient.close();
+      }
+
+      cache.clear();
+    } finally {
+      hmsClient = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/bfb07b4a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
index 4a8fb95..12bf4a1 100644
--- 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
+++ 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
@@ -29,7 +29,6 @@ import com.google.common.annotations.VisibleForTesting;
 
 import java.io.IOException;
 
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
@@ -242,62 +241,4 @@ class SentryHMSClient implements AutoCloseable {
       return emptyMap();
     }
   }
-
-  /**
-   * Returns all HMS notifications with ID greater than the specified one
-   *
-   * @param notificationId ID of the last notification that was processed.
-   * @return Collection of new events to be synced
-   */
-  Collection<NotificationEvent> getNotifications(long notificationId) throws 
Exception {
-    if (client == null) {
-      LOGGER.error(NOT_CONNECTED_MSG);
-      return Collections.emptyList();
-    }
-
-    LOGGER.debug("Checking for notifications beyond {}", notificationId);
-
-    // A bug HIVE-15761 (fixed on Hive 2.4.0) should allow requesting 
notifications using
-    // an unprocessed notification ID without causing an exception. For now, 
we just
-    // leave this workaround and log debug messages.
-    CurrentNotificationEventId eventId = 
client.getCurrentNotificationEventId();
-    LOGGER.debug("ID of Last HMS notifications is: {}", eventId.getEventId());
-    if (eventId != null && eventId.getEventId() < notificationId) {
-      LOGGER.debug("Last notification of HMS is smaller than what sentry 
processed, Something is"
-          + "wrong. Sentry will request a full Snapshot");
-      return Collections.emptyList();
-    }
-
-    if (eventId != null && eventId.getEventId() == notificationId) {
-      return Collections.emptyList();
-    }
-
-    NotificationEventResponse response =
-        client.getNextNotification(notificationId, Integer.MAX_VALUE, null);
-    if (response != null && response.isSetEvents()) {
-      LOGGER.debug("Last Id processed:{}. Received collection of 
notifications, Size:{}",
-          notificationId, response.getEvents().size());
-      return response.getEvents();
-    }
-
-    return Collections.emptyList();
-  }
-
-  /**
-   * @return the latest notification Id logged by the HMS
-   * @throws Exception when an error occurs when talking to the HMS client
-   */
-  long getCurrentNotificationId() throws Exception {
-    if (client == null) {
-      LOGGER.error(NOT_CONNECTED_MSG);
-      return SentryStore.EMPTY_NOTIFICATION_ID;
-    }
-
-    CurrentNotificationEventId eventId = 
client.getCurrentNotificationEventId();
-    if (eventId.isSetEventId()) {
-      return eventId.getEventId();
-    }
-
-    return SentryStore.EMPTY_NOTIFICATION_ID;
-  }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/bfb07b4a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
 
b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
index f56384a..35f8316 100644
--- 
a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
+++ 
b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.when;
 import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -32,9 +33,13 @@ import java.util.Set;
 import junit.framework.Assert;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -47,6 +52,7 @@ import org.apache.sentry.hdfs.UniquePathsUpdate;
 import org.apache.sentry.provider.db.service.persistent.PathsImage;
 import org.apache.sentry.provider.db.service.persistent.SentryStore;
 import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -62,6 +68,11 @@ public class TestHMSFollower {
   private final SentryStore sentryStore = Mockito.mock(SentryStore.class);
   private static HiveSimpleConnectionFactory hiveConnectionFactory;
 
+  private final static HiveConnectionFactory hmsConnectionMock
+      = Mockito.mock(HiveConnectionFactory.class);
+  private final static HiveMetaStoreClient hmsClientMock
+      = Mockito.mock(HiveMetaStoreClient.class);
+
   @BeforeClass
   public static void setup() throws IOException, LoginException {
     hiveConnectionFactory = new HiveSimpleConnectionFactory(configuration, new 
HiveConf());
@@ -73,6 +84,12 @@ public class TestHMSFollower {
     
configuration.set(ServiceConstants.ServerConfig.SENTRY_POLICY_STORE_PLUGINS, 
"org.apache.sentry.hdfs.SentryPlugin");
   }
 
+  @Before
+  public void setupMocks() throws Exception {
+    reset(hmsConnectionMock, hmsClientMock);
+    when(hmsConnectionMock.connect()).thenReturn(new HMSClient(hmsClientMock));
+  }
+
   @Test
   public void testPersistAFullSnapshotWhenNoSnapshotAreProcessedYet() throws 
Exception {
     /*
@@ -91,12 +108,15 @@ public class TestHMSFollower {
     snapshotObjects.put("db.table", Sets.newHashSet("/db/table"));
     PathsImage fullSnapshot = new PathsImage(snapshotObjects, 
HMS_PROCESSED_EVENT_ID, 1);
 
+    // Mock that returns the current HMS notification ID
+    when(hmsClientMock.getCurrentNotificationEventId())
+        .thenReturn(new CurrentNotificationEventId(fullSnapshot.getId()));
+
     SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class);
     when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot);
-    
when(sentryHmsClient.getCurrentNotificationId()).thenReturn(fullSnapshot.getId());
 
     HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
-        hiveConnectionFactory, hiveInstance);
+        hmsConnectionMock, hiveInstance);
     hmsFollower.setSentryHmsClient(sentryHmsClient);
 
     // 1st run should get a full snapshot because AuthzPathsMapping is empty
@@ -140,12 +160,15 @@ public class TestHMSFollower {
     snapshotObjects.put("db.table", Sets.newHashSet("/db/table"));
     PathsImage fullSnapshot = new PathsImage(snapshotObjects, 
HMS_PROCESSED_EVENT_ID, 1);
 
+    // Mock that returns the current HMS notification ID
+    when(hmsClientMock.getCurrentNotificationEventId())
+        .thenReturn(new CurrentNotificationEventId(fullSnapshot.getId()));
+
     SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class);
     when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot);
-    
when(sentryHmsClient.getCurrentNotificationId()).thenReturn(fullSnapshot.getId());
 
     HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
-        hiveConnectionFactory, hiveInstance);
+        hmsConnectionMock, hiveInstance);
     hmsFollower.setSentryHmsClient(sentryHmsClient);
 
     // 1st run should get a full snapshot
@@ -189,21 +212,29 @@ public class TestHMSFollower {
     snapshotObjects.put("db.table", Sets.newHashSet("/db/table"));
     PathsImage fullSnapshot = new PathsImage(snapshotObjects, 
HMS_PROCESSED_EVENT_ID, 1);
 
+    // Mock that returns the current HMS notification ID
+    when(hmsClientMock.getCurrentNotificationEventId())
+        .thenReturn(new CurrentNotificationEventId(fullSnapshot.getId()));
+
     SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class);
     when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot);
-    
when(sentryHmsClient.getCurrentNotificationId()).thenReturn(fullSnapshot.getId());
-    when(sentryHmsClient.getNotifications(SENTRY_PROCESSED_EVENT_ID))
-        .thenReturn(Collections.singletonList(
-            new NotificationEvent(fullSnapshot.getId(), 0, "", "")));
+
+    
when(hmsClientMock.getNextNotification(Mockito.eq(SENTRY_PROCESSED_EVENT_ID - 
1), Mockito.eq(Integer.MAX_VALUE),
+        (NotificationFilter) Mockito.notNull()))
+        .thenReturn(new NotificationEventResponse(
+            Arrays.<NotificationEvent>asList(
+                new NotificationEvent(fullSnapshot.getId(), 0, "", "")
+            )
+        ));
 
     HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
-        hiveConnectionFactory, hiveInstance);
+        hmsConnectionMock, hiveInstance);
     hmsFollower.setSentryHmsClient(sentryHmsClient);
 
     // 1st run should get a full snapshot
     when(sentryStore.getLastProcessedNotificationID())
         .thenReturn(SENTRY_PROCESSED_EVENT_ID);
-    when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
+    when(sentryStore.isHmsNotificationEmpty()).thenReturn(false);
     hmsFollower.run();
     verify(sentryStore, times(1)).persistFullPathsImage(Mockito.anyMap(), 
Mockito.anyLong());
     verify(sentryStore, 
times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
@@ -771,13 +802,16 @@ public class TestHMSFollower {
     snapshotObjects.put("db.table", Sets.newHashSet("/db/table"));
     PathsImage fullSnapshot = new PathsImage(snapshotObjects, 
HMS_PROCESSED_EVENT_ID, 1);
 
+    // Mock that returns the current HMS notification ID
+    when(hmsClientMock.getCurrentNotificationEventId())
+        .thenReturn(new CurrentNotificationEventId(fullSnapshot.getId()));
+
     SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class);
     when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot);
-    
when(sentryHmsClient.getCurrentNotificationId()).thenReturn(fullSnapshot.getId());
 
     Configuration configuration = new Configuration();
     HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
-        hiveConnectionFactory, hiveInstance);
+        hmsConnectionMock, hiveInstance);
     hmsFollower.setSentryHmsClient(sentryHmsClient);
 
     // 1st run should get a full snapshot because AuthzPathsMapping is empty

http://git-wip-us.apache.org/repos/asf/sentry/blob/bfb07b4a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java
 
b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java
new file mode 100644
index 0000000..83a1bec
--- /dev/null
+++ 
b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java
@@ -0,0 +1,163 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  <p>
+  http://www.apache.org/licenses/LICENSE-2.0
+  <p>
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.sentry.hdfs.UniquePathsUpdate;
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestHiveNotificationFetcher {
+  @Test
+  public void testGetEmptyNotificationsWhenHmsReturnsANullResponse() throws 
Exception {
+    SentryStore store = Mockito.mock(SentryStore.class);
+    HiveConnectionFactory hmsConnection = 
Mockito.mock(HiveConnectionFactory.class);
+    HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class);
+
+    Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient));
+
+    try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, 
hmsConnection)) {
+      List<NotificationEvent> events;
+
+      Mockito.when(hmsClient.getNextNotification(0, Integer.MAX_VALUE, null))
+          .thenReturn(null);
+
+      events = fetcher.fetchNotifications(0);
+      assertTrue(events.isEmpty());
+    }
+  }
+
+  @Test
+  public void testGetEmptyNotificationsWhenHmsReturnsEmptyEvents() throws 
Exception {
+    SentryStore store = Mockito.mock(SentryStore.class);
+    HiveConnectionFactory hmsConnection = 
Mockito.mock(HiveConnectionFactory.class);
+    HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class);
+
+    Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient));
+
+    try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, 
hmsConnection)) {
+      List<NotificationEvent> events;
+
+      Mockito.when(hmsClient.getNextNotification(0, Integer.MAX_VALUE, null))
+          .thenReturn(new 
NotificationEventResponse(Collections.<NotificationEvent>emptyList()));
+
+      events = fetcher.fetchNotifications(0);
+      assertTrue(events.isEmpty());
+    }
+  }
+
+  @Test
+  public void testGetAllNotificationsReturnedByHms() throws Exception {
+    SentryStore store = Mockito.mock(SentryStore.class);
+    HiveConnectionFactory hmsConnection = 
Mockito.mock(HiveConnectionFactory.class);
+    HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class);
+
+    Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient));
+
+    try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, 
hmsConnection)) {
+      List<NotificationEvent> events;
+
+      Mockito.when(hmsClient.getNextNotification(0, Integer.MAX_VALUE, null))
+          .thenReturn(new NotificationEventResponse(
+              Arrays.<NotificationEvent>asList(
+                  new NotificationEvent(1L, 0, "CREATE_DATABASE", ""),
+                  new NotificationEvent(2L, 0, "CREATE_TABLE", "")
+              )
+          ));
+
+      events = fetcher.fetchNotifications(0);
+      assertEquals(2, events.size());
+      assertEquals(1, events.get(0).getEventId());
+      assertEquals("CREATE_DATABASE", events.get(0).getEventType());
+      assertEquals(2, events.get(1).getEventId());
+      assertEquals("CREATE_TABLE", events.get(1).getEventType());
+    }
+  }
+
+  @Test
+  public void testGetDuplicatedEventsAndFilterEventsAlreadySeen() throws 
Exception {
+    final SentryStore store = Mockito.mock(SentryStore.class);
+    HiveConnectionFactory hmsConnection = 
Mockito.mock(HiveConnectionFactory.class);
+    HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class);
+
+    Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient));
+
+    try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, 
hmsConnection)) {
+      List<NotificationEvent> events;
+
+      /*
+       * Requesting an ID > 0 will request all notifications from 0 again but 
filter those
+       * already seen notifications with ID = 1
+       */
+
+      // This mock will also test that the NotificationFilter works as expected
+      Mockito.when(hmsClient.getNextNotification(Mockito.eq(0L), 
Mockito.eq(Integer.MAX_VALUE),
+          (NotificationFilter) Mockito.notNull())).thenAnswer(new 
Answer<NotificationEventResponse>() {
+            @Override
+            public NotificationEventResponse answer(InvocationOnMock 
invocation)
+                throws Throwable {
+              NotificationFilter filter = (NotificationFilter) 
invocation.getArguments()[2];
+              NotificationEventResponse response = new 
NotificationEventResponse();
+
+              List<NotificationEvent> events = 
Arrays.<NotificationEvent>asList(
+                  new NotificationEvent(1L, 0, "CREATE_DATABASE", ""),
+                  new NotificationEvent(1L, 0, "CREATE_TABLE", ""),
+                  new NotificationEvent(2L, 0, "ALTER_TABLE", "")
+              );
+
+              for (NotificationEvent event : events) {
+                String hash = UniquePathsUpdate.sha1(event);
+                
+                // We simulate that CREATE_DATABASE is already processed
+                if (event.getEventType().equals("CREATE_DATABASE")) {
+                  
Mockito.when(store.isNotificationProcessed(Mockito.eq(hash))).thenReturn(true);
+                } else {
+                  
Mockito.when(store.isNotificationProcessed(Mockito.eq(hash))).thenReturn(false);
+                }
+
+                if (filter.accept(event)) {
+                  response.addToEvents(event);
+                }
+              }
+
+              return response;
+            }
+          });
+
+      events = fetcher.fetchNotifications(1);
+      assertEquals(2, events.size());
+      assertEquals(1, events.get(0).getEventId());
+      assertEquals("CREATE_TABLE", events.get(0).getEventType());
+      assertEquals(2, events.get(1).getEventId());
+      assertEquals("ALTER_TABLE", events.get(1).getEventType());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/bfb07b4a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestSentryHMSClient.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestSentryHMSClient.java
 
b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestSentryHMSClient.java
index 77a2bbb..38668ca 100644
--- 
a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestSentryHMSClient.java
+++ 
b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestSentryHMSClient.java
@@ -25,20 +25,14 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Random;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
 import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hive.hcatalog.messaging.HCatEventMessage;
-import 
org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageFactory;
 import org.apache.sentry.provider.db.service.persistent.PathsImage;
 import org.apache.thrift.TException;
 import org.junit.Assert;
@@ -56,7 +50,6 @@ import javax.security.auth.login.LoginException;
 public class TestSentryHMSClient {
 
   private static final Configuration conf = new Configuration();
-  private static final SentryJSONMessageFactory messageFactory = new 
SentryJSONMessageFactory();
   private static SentryHMSClient client;
   private static MockHMSClientFactory hiveConnectionFactory;
 
@@ -108,36 +101,6 @@ public class TestSentryHMSClient {
     return partition;
   }
 
-  /**
-   * Creates create database notification
-   *
-   * @return NotificationEvent
-   */
-  private static NotificationEvent getCreateDatabaseNotification(long id) {
-    Random rand = new Random();
-    int n = rand.nextInt(100) + 1;
-    String dbName = "db" + n;
-    return new NotificationEvent(id, 0, 
HCatEventMessage.EventType.CREATE_DATABASE.toString(),
-        messageFactory
-            .buildCreateDatabaseMessage(new Database(dbName, null, "hdfs:///" 
+ dbName, null))
-            .toString());
-  }
-
-  /**
-   * Creates drop database notification
-   *
-   * @return NotificationEvent
-   */
-  private static NotificationEvent getDropDatabaseNotification(long id) {
-    Random rand = new Random();
-    int n = rand.nextInt(100) + 1;
-    String dbName = "db" + n;
-    return new NotificationEvent(id, 0, 
HCatEventMessage.EventType.DROP_DATABASE.toString(),
-        messageFactory
-            .buildDropDatabaseMessage(new Database(dbName, null, "hdfs:///" + 
dbName, null))
-            .toString());
-  }
-
   @BeforeClass
   static public void initialize() throws IOException, LoginException {
     hiveConnectionFactory = new MockHMSClientFactory();
@@ -235,94 +198,6 @@ public class TestSentryHMSClient {
   }
 
   /**
-   * Test scenario when there is no HMS connection
-   * Getting new notifications
-   */
-  @Test
-  public void testGetNewNotificationsWithOutClientConnected() throws Exception 
{
-    HiveTable tab21 = new HiveTable("tab21");
-    HiveTable tab31 = new HiveTable("tab31");
-    HiveDb db3 = new HiveDb("db3", Lists.newArrayList(tab31));
-    HiveDb db2 = new HiveDb("db2", Lists.newArrayList(tab21));
-    HiveDb db1 = new HiveDb("db1");
-    client.setClient(null);
-    HiveSnapshot snap = new HiveSnapshot().add(db1).add(db2).add(db3);
-    MockClient mockClient = new MockClient(snap, 100);
-    Mockito.when(mockClient.client.getCurrentNotificationEventId()).
-        thenReturn(new CurrentNotificationEventId(mockClient.eventId));
-    // Make sure that client is not connected
-    Assert.assertTrue(!client.isConnected());
-    Collection<NotificationEvent> events = client.getNotifications(100);
-    Assert.assertTrue(events.isEmpty());
-
-  }
-
-  /**
-   * Test scenario where there are no notifications
-   * Getting new notifications
-   */
-  @Test
-  public void testGetNewNotificationsWithNoHmsUpdates() throws Exception {
-    HiveTable tab21 = new HiveTable("tab21");
-    HiveTable tab31 = new HiveTable("tab31");
-    HiveDb db3 = new HiveDb("db3", Lists.newArrayList(tab31));
-    HiveDb db2 = new HiveDb("db2", Lists.newArrayList(tab21));
-    HiveDb db1 = new HiveDb("db1");
-    HiveSnapshot snap = new HiveSnapshot().add(db1).add(db2).add(db3);
-    MockClient mockClient = new MockClient(snap, 100);
-    Mockito.when(mockClient.client.getCurrentNotificationEventId()).
-        thenReturn(new CurrentNotificationEventId(mockClient.eventId));
-    client.setClient(mockClient.client);
-    hiveConnectionFactory.setClient(mockClient);
-    // Make sure that client is connected
-    Assert.assertTrue(client.isConnected());
-    Collection<NotificationEvent> events = client.getNotifications(100);
-    Assert.assertTrue(events.isEmpty());
-  }
-
-  /**
-   * Test scenario where there are notifications
-   * Getting new notifications
-   */
-  @Test
-  public void testGetNewNotificationsSuccess() throws Exception {
-    final MockClient mockClient = new MockClient(new HiveSnapshot(), 100);
-    client.setClient(mockClient.client);
-    hiveConnectionFactory.setClient(mockClient);
-    // Make sure that client is connected
-    Assert.assertTrue(client.isConnected());
-
-    Mockito.when(mockClient.client.getCurrentNotificationEventId()).
-        thenAnswer(new Answer<CurrentNotificationEventId>() {
-          @Override
-          public CurrentNotificationEventId answer(InvocationOnMock invocation)
-              throws Throwable {
-            return new 
CurrentNotificationEventId(mockClient.incrementNotificationEventId());
-          }
-        });
-    Mockito.when(mockClient.client.getNextNotification(Mockito.anyLong(), 
Mockito.anyInt(),
-        Mockito.any(NotificationFilter.class))).
-        thenAnswer(new Answer<NotificationEventResponse>() {
-          @Override
-          public NotificationEventResponse answer(InvocationOnMock invocation)
-              throws Throwable {
-            long id = 1;
-            List<NotificationEvent> events = new ArrayList<>();
-            events.add(getCreateDatabaseNotification(id++));
-            events.add(getDropDatabaseNotification(id++));
-            return new NotificationEventResponse(events);
-          }
-        });
-
-    Collection<NotificationEvent> events = client.getNotifications(100);
-    long id = 1;
-    for (NotificationEvent event : events) {
-      Assert.assertEquals(event.getEventId(), id++);
-    }
-    Assert.assertTrue(events.size() == 2);
-  }
-
-  /**
    * Representation of a Hive table. A table has a name and a list of 
partitions.
    */
   private static class HiveTable {

Reply via email to