SENTRY-1819: HMSFollower and friends do not belong in sentry.service.thrift 
(Xinran Tinney, reviewed by Sergio Pena, kalyan kumar kalvagadda, Na Li, Steve 
Moist)


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/c4226f64
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/c4226f64
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/c4226f64

Branch: refs/heads/master
Commit: c4226f649d71697e24ca9540907cdecd9bcdb9f5
Parents: b87651c
Author: Sergio Pena <[email protected]>
Authored: Thu Jan 18 09:58:17 2018 -0600
Committer: Sergio Pena <[email protected]>
Committed: Thu Jan 18 09:58:17 2018 -0600

----------------------------------------------------------------------
 .../org/apache/sentry/hdfs/SentryPlugin.java    |    2 +-
 .../db/service/persistent/CounterWait.java      |  341 ++++++
 .../db/service/persistent/HMSFollower.java      |  495 ++++++++
 .../service/persistent/LeaderStatusMonitor.java |  286 +++++
 .../persistent/NotificationProcessor.java       |  773 +++++++++++++
 .../db/service/persistent/SentryStore.java      |    1 -
 .../sentry/service/thrift/CounterWait.java      |  341 ------
 .../apache/sentry/service/thrift/HMSClient.java |    4 +-
 .../sentry/service/thrift/HMSFollower.java      |  490 --------
 .../service/thrift/HiveNotificationFetcher.java |    6 +-
 .../thrift/HiveSimpleConnectionFactory.java     |    2 +-
 .../service/thrift/LeaderStatusMonitor.java     |  287 -----
 .../service/thrift/NotificationProcessor.java   |  773 -------------
 .../sentry/service/thrift/SentryHMSClient.java  |    8 +-
 .../sentry/service/thrift/SentryService.java    |    2 +
 .../db/service/persistent/TestCounterWait.java  |  107 ++
 .../db/service/persistent/TestHMSFollower.java  | 1064 ++++++++++++++++++
 .../TestHMSFollowerSentryStoreIntegration.java  |    1 -
 .../persistent/TestLeaderStatusMonitor.java     |  200 ++++
 .../persistent/TestNotificationProcessor.java   |  488 ++++++++
 .../sentry/service/thrift/TestCounterWait.java  |  107 --
 .../sentry/service/thrift/TestHMSFollower.java  | 1061 -----------------
 .../service/thrift/TestLeaderStatusMonitor.java |  201 ----
 .../thrift/TestNotificationProcessor.java       |  488 --------
 24 files changed, 3767 insertions(+), 3761 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/c4226f64/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
----------------------------------------------------------------------
diff --git 
a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
 
b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
index eee4489..cf764ed 100644
--- 
a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
+++ 
b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
@@ -41,7 +41,7 @@ import 
org.apache.sentry.provider.db.service.thrift.TDropSentryRoleRequest;
 import org.apache.sentry.provider.db.service.thrift.TRenamePrivilegesRequest;
 import org.apache.sentry.provider.db.service.thrift.TSentryGroup;
 import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege;
-import org.apache.sentry.service.thrift.HMSFollower;
+import org.apache.sentry.provider.db.service.persistent.HMSFollower;
 import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/sentry/blob/c4226f64/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/CounterWait.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/CounterWait.java
 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/CounterWait.java
new file mode 100644
index 0000000..d8c8297
--- /dev/null
+++ 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/CounterWait.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.provider.db.service.persistent;
+
+import org.apache.http.annotation.ThreadSafe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Waiting for counter to reach certain value.
+ * The counter starts from zero and its value increases over time.
+ * The class allows for multiple consumers waiting until the value of the
+ * counter reaches some value interesting to them.
+ * Consumers call {@link #waitFor(long)} which may either return
+ * immediately if the counter reached the specified value, or block
+ * until this value is reached. Consumers can also specify timeout for the
+ * {@link #waitFor(long)} in which case it may return {@link TimeoutException}
+ * when the wait was not successfull within the specified time limit.
+ * <p>
+ * All waiters should be waken up when the counter becomes equal or higher
+ * then the value they are waiting for.
+ * <p>
+ * The counter is updated by a single updater that should only increase the
+ * counter value.
+ * The updater calls the {@link #update(long)} method to update the counter
+ * value and this should wake up all threads waiting for any value smaller or
+ * equal to the new one.
+ * <p>
+ * The class is thread-safe.
+ * It is designed for use by multiple waiter threads and a single
+ * updater thread, but it will work correctly even in the presence of multiple
+ * updater threads.
+ */
+@ThreadSafe
+public final class CounterWait {
+  // Implementation notes.
+  //
+  // The implementation is based on:
+  //
+  // 1) Using an atomic counter value which guarantees consistency.
+  //    Since everyone needs only to know when the counter value reached the
+  //    certain value and the counter may only increase its value,
+  //    it is safe to update the counter by another thread after its value
+  //    was read.
+  //
+  // 2) Priority queue of waiters, sorted by their expected values. The 
smallest
+  //    value is always at the top of the queue. The priority queue itself
+  //    is thread-safe, so no locks are needed to protect access to it.
+  //
+  // Each waiter is implemented using a binary semaphore.
+  // This solves the problem of a wakeup that happens before the sleep -
+  // in this case the acquire() doesn't block and returns immediately.
+  //
+  // NOTE: We use PriorityBlockingQueue for waiters because it is thread-safe,
+  //       we are not using its blocking queue semantics.
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CounterWait.class);
+
+  /** Counter value. May only increase. */
+  private final AtomicLong currentId = new AtomicLong(0);
+
+  private final long waitTimeout;
+  private final TimeUnit waitTimeUnit;
+
+  /**
+   * Waiters sorted by the value of the counter they are waiting for.
+   * Note that {@link PriorityBlockingQueue} is thread-safe.
+   * We are not using this as a blocking queue, but as a synchronized
+   * PriorityQueue.
+   */
+  private final PriorityBlockingQueue<ValueEvent> waiters =
+          new PriorityBlockingQueue<>();
+
+  /**
+   * Create an instance of CounterWait object that will not timeout during wait
+   */
+  public CounterWait() {
+    this(0, TimeUnit.SECONDS);
+  }
+
+  /**
+   * Create an instance of CounterWait object that will timeout during wait
+   * @param waitTimeout maximum time in seconds to wait for counter
+   */
+  public CounterWait(long waitTimeoutSec) {
+    this(waitTimeoutSec, TimeUnit.SECONDS);
+  }
+
+  /**
+   * Create an instance of CounterWait object that will timeout during wait
+   * @param waitTimeout maximum time to wait for counter
+   * @param waitTimeUnit time units for wait
+   */
+  public CounterWait(long waitTimeout, TimeUnit waitTimeUnit) {
+    this.waitTimeout = waitTimeout;
+    this.waitTimeUnit = waitTimeUnit;
+  }
+
+  /**
+   * Update the counter value and wake up all threads waiting for this
+   * value or any value below it.
+   * <p>
+   * The counter value should only increase.
+   * An attempt to decrease the value is ignored.
+   *
+   * @param newValue the new counter value
+   */
+  public synchronized void update(long newValue) {
+    // update() is synchronized so the value can't change.
+    long oldValue = currentId.get();
+    LOGGER.debug("CounterWait update: oldValue = {}, newValue = {}", oldValue, 
newValue);
+    // Avoid doing extra work if not needed
+    if (oldValue == newValue) {
+      return; // no-op
+    }
+
+    // Make sure the counter is never decremented.
+    if (newValue < oldValue) {
+      LOGGER.error("new counter value {} is smaller then the previous one {}",
+              newValue, oldValue);
+      return; // no-op
+    }
+
+    currentId.set(newValue);
+
+    // Wake up any threads waiting for a counter to reach this value.
+    wakeup(newValue);
+  }
+
+  /**
+   * Explicitly reset the counter value to a new value, but allow setting to a
+   * smaller value.
+   * This should be used when we have some external event that resets the 
counter
+   * value space.
+   * @param newValue New counter value. If this is greater or equal then the 
current
+   *                value, this is equivalent to {@link #update(long)}. 
Otherwise
+   *                 sets the counter to the new smaller value.
+   */
+  public synchronized void reset(long newValue) {
+    long oldValue = currentId.get();
+    LOGGER.debug("CounterWait reset: oldValue = {}, newValue = {}", oldValue, 
newValue);
+
+    if (newValue > oldValue) {
+      update(newValue);
+    } else if (newValue < oldValue) {
+      LOGGER.warn("resetting counter from {} to smaller value {}",
+              oldValue, newValue);
+      currentId.set(newValue);
+      // No need to wakeup waiters since no one should wait on the smaller 
value
+    }
+  }
+
+
+  /**
+   * Wait for specified counter value.
+   * Returns immediately if the value is reached or blocks until the value
+   * is reached.
+   * Multiple threads can call the method concurrently.
+   *
+   * @param value requested counter value
+   * @return current counter value that should be no smaller then the requested
+   * value
+   * @throws InterruptedException if the wait was interrupted, 
TimeoutException if
+   * wait was not successfull within the timeout value specified at the 
construction time.
+   */
+  public long waitFor(long value) throws InterruptedException, 
TimeoutException {
+    // Fast path - counter value already reached, no need to block
+    if (value <= currentId.get()) {
+      return currentId.get();
+    }
+
+    // Enqueue the waiter for this value
+    ValueEvent eid = new ValueEvent(value);
+    waiters.put(eid);
+
+    // It is possible that between the fast path check and the time the
+    // value event is enqueued, the counter value already reached the requested
+    // value. In this case we return immediately.
+    if (value <= currentId.get()) {
+      return currentId.get();
+    }
+
+    // At this point we may be sure that by the time the event was enqueued,
+    // the counter was below the requested value. This means that update()
+    // is guaranteed to wake us up when the counter reaches the requested 
value.
+    // The wake up may actually happen before we start waiting, in this case
+    // the event's blocking queue will be non-empty and the waitFor() below
+    // will not block, so it is safe to wake up before the wait.
+    // So sit tight and wait patiently.
+    eid.waitFor();
+    LOGGER.debug("CounterWait added new value to waitFor: value = {}, 
currentId = {}", value, currentId.get());
+    return currentId.get();
+  }
+
+  /**
+   * Wake up any threads waiting for a counter to reach specified value
+   * Peek at the top of the queue. If the queue is empty or the top value
+   * exceeds the current value, we are done. Otherwise wakeup the top thread,
+   * remove the corresponding waiter and continue.
+   * <p>
+   * Note that the waiter may be removed under our nose by
+   * {@link #waitFor(long)} method, but this is Ok - in this case
+   * waiters.remove() will just return false.
+   *
+   * @param value current counter value
+   */
+  private void wakeup(long value) {
+    while (true) {
+      // Get the top of the waiters queue or null if it is empty
+      ValueEvent e = waiters.poll();
+      if (e == null) {
+        // Queue is empty - return.
+        return;
+      }
+      // No one to wake up, return event to the queue and exit
+      if (e.getValue() > value) {
+        waiters.add(e);
+        return;
+      }
+      // Due for wake-up call
+      LOGGER.debug("CounterWait wakeup: event = {} is less than value = {}", 
e.getValue(), value);
+      e.wakeup();
+    }
+  }
+
+  // Useful for debugging
+  @Override
+  public String toString() {
+    return "CounterWait{" + "currentId=" + currentId +
+            ", waiters=" + waiters + "}";
+  }
+
+  /**
+   * Return number of waiters. This is mostly useful for metrics/debugging
+   *
+   * @return number of sleeping waiters
+   */
+  public int waitersCount() {
+    return waiters.size();
+  }
+
+  /**
+   * Representation of the waiting event.
+   * The waiting event consists of the expected value and a binary semaphore.
+   * <p>
+   * Each thread waiting for the given value, creates a ValueEvent and tries
+   * to acquire a semaphore. This blocks until the semaphore is released.
+   * <p>
+   * ValueEvents are stored in priority queue sorted by value, so they should 
be
+   * comparable by the value.
+   */
+  private class ValueEvent implements Comparable<ValueEvent> {
+    /** Value waited for. */
+    private final long value;
+    /** Binary semaphore to synchronize waiters */
+    private final Semaphore semaphore = new Semaphore(1);
+
+    /**
+     * Instantiates a new Value event.
+     *
+     * @param v the expected value
+     */
+    ValueEvent(long v) {
+      this.value = v;
+      // Acquire the semaphore. Subsequent calls to waitFor() will block until
+      // wakeup() releases the semaphore.
+      semaphore.acquireUninterruptibly(); // Will not block
+    }
+
+    /** Wait until signaled or interrupted. May return immediately if already 
signalled. */
+    void waitFor() throws InterruptedException, TimeoutException {
+      if (waitTimeout == 0) {
+        semaphore.acquire();
+        return;
+      }
+      if (!semaphore.tryAcquire(waitTimeout, waitTimeUnit)) {
+        throw new TimeoutException();
+      }
+    }
+
+    /** @return the value we are waiting for. */
+    long getValue() {
+      return value;
+    }
+
+    /** Wakeup the waiting thread. */
+    void wakeup() {
+      semaphore.release();
+    }
+
+    /**
+     * Compare objects by value.
+     */
+    @Override
+    public int compareTo(final ValueEvent o) {
+      return value == o.value ? 0
+              : value < o.value ? -1
+              : 1;
+    }
+
+    /**
+     * Use identity comparison of objects.
+     */
+    @Override
+    public boolean equals(final Object o) {
+      return (this == o);
+    }
+
+    @Override
+    public int hashCode() {
+      return (int) (value ^ (value >>> 32));
+    }
+
+    @Override
+    public String toString() {
+      return String.valueOf(value);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/c4226f64/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java
 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java
new file mode 100644
index 0000000..2f2b984
--- /dev/null
+++ 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java
@@ -0,0 +1,495 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+package org.apache.sentry.provider.db.service.persistent;
+
+import org.apache.sentry.core.common.utils.PubSub;
+import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
+
+import static 
org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME;
+import static 
org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME_DEPRECATED;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jdo.JDODataStoreException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.thrift.TException;
+import org.apache.sentry.service.thrift.SentryHMSClient;
+import org.apache.sentry.service.thrift.HiveConnectionFactory;
+import org.apache.sentry.service.thrift.HiveNotificationFetcher;
+import org.apache.sentry.service.thrift.SentryServiceUtil;
+import org.apache.sentry.service.thrift.SentryStateBank;
+import org.apache.sentry.service.thrift.SentryServiceState;
+import org.apache.sentry.service.thrift.HMSFollowerState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HMSFollower is the thread which follows the Hive MetaStore state changes 
from Sentry.
+ * It gets the full update and notification logs from HMS and applies it to
+ * update permissions stored in Sentry using SentryStore and also update the 
&lt obj,path &gt state
+ * stored for HDFS-Sentry sync.
+ */
+public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber 
{
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HMSFollower.class);
+  private static final String FULL_UPDATE_TRIGGER = "FULL UPDATE TRIGGER: ";
+  private static boolean connectedToHms = false;
+
+  private SentryHMSClient client;
+  private final Configuration authzConf;
+  private final SentryStore sentryStore;
+  private final NotificationProcessor notificationProcessor;
+  private boolean readyToServe;
+  private final HiveNotificationFetcher notificationFetcher;
+  private final boolean hdfsSyncEnabled;
+  private final AtomicBoolean fullUpdateHMS = new AtomicBoolean(false);
+
+  private final LeaderStatusMonitor leaderMonitor;
+
+  /**
+   * Current generation of HMS snapshots. HMSFollower is single-threaded, so 
no need
+   * to protect against concurrent modification.
+   */
+  private long hmsImageId = SentryStore.EMPTY_PATHS_SNAPSHOT_ID;
+
+  /**
+   * Configuring Hms Follower thread.
+   *
+   * @param conf sentry configuration
+   * @param store sentry store
+   * @param leaderMonitor singleton instance of LeaderStatusMonitor
+   */
+  public HMSFollower(Configuration conf, SentryStore store, 
LeaderStatusMonitor leaderMonitor,
+              HiveConnectionFactory hiveConnectionFactory) {
+    this(conf, store, leaderMonitor, hiveConnectionFactory, null);
+  }
+
+  /**
+   * Constructor should be used only for testing purposes.
+   *
+   * @param conf sentry configuration
+   * @param store sentry store
+   * @param leaderMonitor
+   * @param authServerName Server that sentry is Authorizing
+   */
+  @VisibleForTesting
+  public HMSFollower(Configuration conf, SentryStore store, 
LeaderStatusMonitor leaderMonitor,
+              HiveConnectionFactory hiveConnectionFactory, String 
authServerName) {
+    LOGGER.info("HMSFollower is being initialized");
+    readyToServe = false;
+    authzConf = conf;
+    this.leaderMonitor = leaderMonitor;
+    sentryStore = store;
+
+    if (authServerName == null) {
+      authServerName = conf.get(AUTHZ_SERVER_NAME.getVar(),
+        conf.get(AUTHZ_SERVER_NAME_DEPRECATED.getVar(), 
AUTHZ_SERVER_NAME_DEPRECATED.getDefault()));
+    }
+
+    notificationProcessor = new NotificationProcessor(sentryStore, 
authServerName, authzConf);
+    client = new SentryHMSClient(authzConf, hiveConnectionFactory);
+    hdfsSyncEnabled = SentryServiceUtil.isHDFSSyncEnabledNoCache(authzConf); 
// no cache to test different settings for hdfs sync
+    notificationFetcher = new HiveNotificationFetcher(sentryStore, 
hiveConnectionFactory);
+
+    // subscribe to full update notification
+    if (conf.getBoolean(ServerConfig.SENTRY_SERVICE_FULL_UPDATE_PUBSUB, 
false)) {
+      LOGGER.info(FULL_UPDATE_TRIGGER + "subscribing to topic " + 
PubSub.Topic.HDFS_SYNC_HMS.getName());
+      PubSub.getInstance().subscribe(PubSub.Topic.HDFS_SYNC_HMS, this);
+    }
+
+  }
+
+  @VisibleForTesting
+  public static boolean isConnectedToHms() {
+    return connectedToHms;
+  }
+
+  @VisibleForTesting
+  void setSentryHmsClient(SentryHMSClient client) {
+    this.client = client;
+  }
+
+  @Override
+  public void close() {
+    if (client != null) {
+      // Close any outstanding connections to HMS
+      try {
+        client.disconnect();
+        
SentryStateBank.disableState(HMSFollowerState.COMPONENT,HMSFollowerState.CONNECTED);
+      } catch (Exception failure) {
+        LOGGER.error("Failed to close the Sentry Hms Client", failure);
+      }
+    }
+
+    notificationFetcher.close();
+  }
+
+  @Override
+  public void run() {
+    
SentryStateBank.enableState(HMSFollowerState.COMPONENT,HMSFollowerState.STARTED);
+    long lastProcessedNotificationId;
+    try {
+      try {
+        // Initializing lastProcessedNotificationId based on the latest 
persisted notification ID.
+        lastProcessedNotificationId = 
sentryStore.getLastProcessedNotificationID();
+      } catch (Exception e) {
+        LOGGER.error("Failed to get the last processed notification id from 
sentry store, "
+            + "Skipping the processing", e);
+        return;
+      }
+      // Wake any clients connected to this service waiting for HMS already 
processed notifications.
+      wakeUpWaitingClientsForSync(lastProcessedNotificationId);
+      // Only the leader should listen to HMS updates
+      if (!isLeader()) {
+        // Close any outstanding connections to HMS
+        close();
+        return;
+      }
+      syncupWithHms(lastProcessedNotificationId);
+    } finally {
+      
SentryStateBank.disableState(HMSFollowerState.COMPONENT,HMSFollowerState.STARTED);
+    }
+  }
+
+  private boolean isLeader() {
+    return (leaderMonitor == null) || leaderMonitor.isLeader();
+  }
+
+  @VisibleForTesting
+  String getAuthServerName() {
+    return notificationProcessor.getAuthServerName();
+  }
+
+  /**
+   * Processes new Hive Metastore notifications.
+   *
+   * <p>If no notifications are processed yet, then it
+   * does a full initial snapshot of the Hive Metastore followed by new 
notifications updates that
+   * could have happened after it.
+   *
+   * <p>Clients connections waiting for an event notification will be
+   * woken up afterwards.
+   */
+  private void syncupWithHms(long notificationId) {
+    try {
+      client.connect();
+      connectedToHms = true;
+      
SentryStateBank.enableState(HMSFollowerState.COMPONENT,HMSFollowerState.CONNECTED);
+    } catch (Throwable e) {
+      LOGGER.error("HMSFollower cannot connect to HMS!!", e);
+      return;
+    }
+
+    try {
+      /* Before getting notifications, it checks if a full HMS snapshot is 
required. */
+      if (isFullSnapshotRequired(notificationId)) {
+        createFullSnapshot();
+        return;
+      }
+
+      Collection<NotificationEvent> notifications =
+          notificationFetcher.fetchNotifications(notificationId);
+
+      // After getting notifications, it checks if the HMS did some clean-up 
and notifications
+      // are out-of-sync with Sentry.
+      if (areNotificationsOutOfSync(notifications, notificationId)) {
+        createFullSnapshot();
+        return;
+      }
+
+      if (!readyToServe) {
+        // Allow users and/or applications who look into the Sentry console 
output to see
+        // when Sentry is ready to serve.
+        System.out.println("Sentry HMS support is ready");
+        readyToServe = true;
+      }
+
+      // Continue with processing new notifications if no snapshots are done.
+      processNotifications(notifications);
+    } catch (TException e) {
+      LOGGER.error("An error occurred while fetching HMS notifications: ", e);
+      close();
+    } catch (Throwable t) {
+      // catching errors to prevent the executor to halt.
+      LOGGER.error("Exception in HMSFollower! Caused by: " + t.getMessage(), 
t);
+
+      close();
+    }
+  }
+
+  /**
+   * Checks if a new full HMS snapshot request is needed by checking if:
+   * <ul>
+   *   <li>Sentry HMS Notification table is EMPTY</li>
+   *   <li>HDFSSync is enabled and Sentry Authz Snapshot table is EMPTY</li>
+   *   <li>The current notification Id on the HMS is less than the
+   *   latest processed by Sentry.</li>
+   *   <li>Full Snapshot Signal is detected</li>
+   * </ul>
+   *
+   * @param latestSentryNotificationId The notification Id to check against 
the HMS
+   * @return True if a full snapshot is required; False otherwise.
+   * @throws Exception If an error occurs while checking the SentryStore or 
the HMS client.
+   */
+  private boolean isFullSnapshotRequired(long latestSentryNotificationId) 
throws Exception {
+    if (sentryStore.isHmsNotificationEmpty()) {
+      LOGGER.debug("Sentry Store has no HMS Notifications. Create Full HMS 
Snapshot. "
+          + "latest sentry notification Id = {}", latestSentryNotificationId);
+      return true;
+    }
+
+    // Once HDFS sync is enabled, and if MAuthzPathsSnapshotId
+    // table is still empty, we need to request a full snapshot
+    if(hdfsSyncEnabled && sentryStore.isAuthzPathsSnapshotEmpty()) {
+      LOGGER.debug("HDFSSync is enabled and MAuthzPathsSnapshotId table is 
empty. Need to request a full snapshot");
+      return true;
+    }
+
+    long currentHmsNotificationId = 
notificationFetcher.getCurrentNotificationId();
+    if (currentHmsNotificationId < latestSentryNotificationId) {
+      LOGGER.info("The current notification ID on HMS = {} is less than the 
latest processed Sentry "
+          + "notification ID = {}. Need to request a full HMS snapshot",
+          currentHmsNotificationId, latestSentryNotificationId);
+      return true;
+    }
+
+    // check if forced full update is required, reset update flag to false
+    // to only do it once per forced full update request.
+    if (fullUpdateHMS.compareAndSet(true, false)) {
+      LOGGER.info(FULL_UPDATE_TRIGGER + "initiating full HMS snapshot 
request");
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
+   * Checks if the HMS and Sentry processed notifications are out-of-sync.
+   * This could happen because the HMS did some clean-up of old notifications
+   * and Sentry was not requesting notifications during that time.
+   *
+   * @param events All new notifications to check for an out-of-sync.
+   * @param latestProcessedId The latest notification processed by Sentry to 
check against the
+   *        list of notifications events.
+   * @return True if an out-of-sync is found; False otherwise.
+   */
+  private boolean areNotificationsOutOfSync(Collection<NotificationEvent> 
events,
+      long latestProcessedId) {
+    if (events.isEmpty()) {
+      return false;
+    }
+
+    /*
+     * If the sequence of notifications has a gap, then an out-of-sync might
+     * have happened due to the following issue:
+     *
+     * - HDFS sync was disabled or Sentry was shutdown for a time period 
longer than
+     * the HMS notification clean-up thread causing old notifications to be 
deleted.
+     *
+     * HMS notifications may contain both gaps in the sequence and duplicates
+     * (the same ID repeated more then once for different events).
+     *
+     * To accept duplicates (see NotificationFetcher for more info), then a 
gap is found
+     * if the 1st notification received is higher than the current ID 
processed + 1.
+     * i.e.
+     *   1st ID = 3, latest ID = 3 (duplicate found but no gap detected)
+     *   1st ID = 4, latest ID = 3 (consecutive ID found but no gap detected)
+     *   1st ID = 5, latest ID = 3 (a gap is detected)
+     */
+
+    List<NotificationEvent> eventList = (List<NotificationEvent>) events;
+    long firstNotificationId = eventList.get(0).getEventId();
+
+    if (firstNotificationId > (latestProcessedId + 1)) {
+      LOGGER.info("First HMS event notification Id = {} is greater than latest 
Sentry processed"
+          + "notification Id = {} + 1. Need to request a full HMS snapshot.", 
firstNotificationId, latestProcessedId);
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
+   * Request for full snapshot and persists it if there is no snapshot 
available in the sentry
+   * store. Also, wakes-up any waiting clients.
+   *
+   * @return ID of last notification processed.
+   * @throws Exception if there are failures
+   */
+  private long createFullSnapshot() throws Exception {
+    LOGGER.debug("Attempting to take full HMS snapshot");
+    
Preconditions.checkState(!SentryStateBank.isEnabled(SentryServiceState.COMPONENT,
+        SentryServiceState.FULL_UPDATE_RUNNING),
+        "HMSFollower shown loading full snapshot when it should not be.");
+    try {
+      // Set that the full update is running
+      SentryStateBank
+          .enableState(SentryServiceState.COMPONENT, 
SentryServiceState.FULL_UPDATE_RUNNING);
+
+      PathsImage snapshotInfo = client.getFullSnapshot();
+      if (snapshotInfo.getPathImage().isEmpty()) {
+        LOGGER.debug("Received empty path image from HMS while taking a full 
snapshot");
+        return snapshotInfo.getId();
+      }
+
+      // Check we're still the leader before persisting the new snapshot
+      if (!isLeader()) {
+        LOGGER.info("Not persisting full snapshot since not a leader");
+        return SentryStore.EMPTY_NOTIFICATION_ID;
+      }
+      try {
+        if (hdfsSyncEnabled) {
+          LOGGER.info("Persisting full snapshot for notification Id = {}", 
snapshotInfo.getId());
+          sentryStore.persistFullPathsImage(snapshotInfo.getPathImage(), 
snapshotInfo.getId());
+        } else {
+          // We need to persist latest notificationID for next poll
+          LOGGER.info("HDFSSync is disabled. Not Persisting full snapshot, "
+              + "but only setting last processed notification Id = {}", 
snapshotInfo.getId());
+          sentryStore.setLastProcessedNotificationID(snapshotInfo.getId());
+        }
+      } catch (Exception failure) {
+        LOGGER.error("Received exception while persisting HMS path full 
snapshot ");
+        throw failure;
+      }
+      // Wake up any HMS waiters that could have been put on hold before 
getting the
+      // eventIDBefore value.
+      wakeUpWaitingClientsForSync(snapshotInfo.getId());
+      // HMSFollower connected to HMS and it finished full snapshot if that 
was required
+      // Log this message only once
+      LOGGER.info("Sentry HMS support is ready");
+      return snapshotInfo.getId();
+    } catch(Exception failure) {
+      LOGGER.error("Received exception while creating HMS path full snapshot 
");
+      throw failure;
+    } finally {
+      SentryStateBank
+          .disableState(SentryServiceState.COMPONENT, 
SentryServiceState.FULL_UPDATE_RUNNING);
+    }
+  }
+
+  /**
+   * Process the collection of notifications and wake up any waiting clients.
+   * Also, persists the notification ID regardless of processing result.
+   *
+   * @param events list of event to be processed
+   * @throws Exception if the complete notification list is not processed 
because of JDO Exception
+   */
+  public void processNotifications(Collection<NotificationEvent> events) 
throws Exception {
+    boolean isNotificationProcessed;
+    if (events.isEmpty()) {
+      return;
+    }
+
+    for (NotificationEvent event : events) {
+      isNotificationProcessed = false;
+      try {
+        // Only the leader should process the notifications
+        if (!isLeader()) {
+          LOGGER.debug("Not processing notifications since not a leader");
+          return;
+        }
+        isNotificationProcessed = 
notificationProcessor.processNotificationEvent(event);
+      } catch (Exception e) {
+        if (e.getCause() instanceof JDODataStoreException) {
+          LOGGER.info("Received JDO Storage Exception, Could be because of 
processing "
+              + "duplicate notification");
+          if (event.getEventId() <= 
sentryStore.getLastProcessedNotificationID()) {
+            // Rest of the notifications need not be processed.
+            LOGGER.error("Received event with Id: {} which is smaller then the 
ID "
+                + "persisted in store", event.getEventId());
+            break;
+          }
+        } else {
+          LOGGER.error("Processing the notification with ID:{} failed with 
exception {}",
+              event.getEventId(), e);
+        }
+      }
+      if (!isNotificationProcessed) {
+        try {
+          // Update the notification ID in the persistent store even when the 
notification is
+          // not processed as the content in in the notification is not valid.
+          // Continue processing the next notification.
+          LOGGER.debug("Explicitly Persisting Notification ID = {} ", 
event.getEventId());
+          sentryStore.persistLastProcessedNotificationID(event.getEventId());
+        } catch (Exception failure) {
+          LOGGER.error("Received exception while persisting the notification 
ID = {}", event.getEventId());
+          throw failure;
+        }
+      }
+      // Wake up any HMS waiters that are waiting for this ID.
+      wakeUpWaitingClientsForSync(event.getEventId());
+    }
+  }
+
+  /**
+   * Wakes up HMS waiters waiting for a specific event notification.<p>
+   *
+   * Verify that HMS image id didn't change since the last time we looked.
+   * If id did, it is possible that notifications jumped backward, so reset
+   * the counter to the current value.
+   *
+   * @param eventId Id of a notification
+   */
+  private void wakeUpWaitingClientsForSync(long eventId) {
+    CounterWait counterWait = sentryStore.getCounterWait();
+
+    LOGGER.debug("wakeUpWaitingClientsForSync: eventId = {}, hmsImageId = {}", 
eventId, hmsImageId);
+    // Wake up any HMS waiters that are waiting for this ID.
+    // counterWait should never be null, but tests mock SentryStore and a 
mocked one
+    // doesn't have it.
+    if (counterWait == null) {
+      return;
+    }
+
+    long lastHMSSnapshotId = hmsImageId;
+    try {
+      // Read actual HMS image ID
+      lastHMSSnapshotId = sentryStore.getLastProcessedImageID();
+      LOGGER.debug("wakeUpWaitingClientsForSync: lastHMSSnapshotId = {}", 
lastHMSSnapshotId);
+    } catch (Exception e) {
+      counterWait.update(eventId);
+      LOGGER.error("Failed to get the last processed HMS image id from sentry 
store");
+      return;
+    }
+
+    // Reset the counter if the persisted image ID is greater than current 
image ID
+    if (lastHMSSnapshotId > hmsImageId) {
+      counterWait.reset(eventId);
+      hmsImageId = lastHMSSnapshotId;
+      LOGGER.debug("wakeUpWaitingClientsForSync: reset counterWait with 
eventId = {}, new hmsImageId = {}", eventId, hmsImageId);
+    }
+
+    LOGGER.debug("wakeUpWaitingClientsForSync: update counterWait with eventId 
= {}, hmsImageId = {}", eventId, hmsImageId);
+    counterWait.update(eventId);
+  }
+
+  /**
+   * PubSub.Subscriber callback API
+   */
+  @Override
+  public void onMessage(PubSub.Topic topic, String message) {
+    Preconditions.checkArgument(topic == PubSub.Topic.HDFS_SYNC_HMS, 
"Unexpected topic %s instead of %s", topic, PubSub.Topic.HDFS_SYNC_HMS);
+    LOGGER.info(FULL_UPDATE_TRIGGER + "Received [{}, {}] notification", topic, 
message);
+    fullUpdateHMS.set(true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/c4226f64/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/LeaderStatusMonitor.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/LeaderStatusMonitor.java
 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/LeaderStatusMonitor.java
new file mode 100644
index 0000000..25a70bd
--- /dev/null
+++ 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/LeaderStatusMonitor.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.provider.db.service.persistent;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.leader.LeaderSelector;
+import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
+import 
org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
+import org.apache.hadoop.conf.Configuration;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.sentry.service.thrift.ServiceConstants.ServerConfig.*;
+
+/**
+ * LeaderStatusMonitor participates in the distributed leader election protocol
+ * and allows clients to access the global leaadership status.
+ * <p>
+ * LeaderStatusMonitor is a singleton that uses Curator framework via
+ * {@link HAContext}.The leadership status can be accessed via the
+ * {@link #isLeader()} method.<p>
+ *
+ * Usually leadership re-election is initiated by the Curator framework when 
one
+ * of the nodes disconnects from ZooKeeper, but LeaderStatusMonitor also 
supports
+ * voluntary release of the leadership via the {@link #deactivate()} method. 
This is
+ * intended to be used for debugging purposes.
+ * <p>
+ * The class also simulates leader election in non-HA environments. In such 
cases its
+ * {@link #isLeader()} method always returns True. The non-HA environment is 
determined
+ * by the absence of the SENTRY_HA_ZOOKEEPER_QUORUM in the configuration.
+ *
+ * <h2>Implementation notes</h2>
+ *
+ * <h3>Initialization</h3>
+ *
+ * Class initialization is split between the constructor and the {@link 
#init()} method.
+ * There are two reasons for it:
+ * <ul>
+ *     <li>We do not want to pass <strong>this</strong> reference to
+ *     {@link HAContext#newLeaderSelector(String, LeaderSelectorListener)}
+ *     until it is fully initialized</li>
+ *     <li>We do not want to call {@link LeaderSelector#start()} method in 
constructor</li>
+ * </ul>
+ *
+ * Since LeaderStatusMonitor is a singleton and an instance can only be 
obtained via the
+ * {@link #getLeaderStatusMonitor(Configuration)} method, we hide this 
construction split
+ * from the callers.
+ *
+ * <h3>Synchronization</h3>
+ * Singleton synchronization is achieved using the synchronized class builder
+ * {@link #getLeaderStatusMonitor(Configuration)}
+ * <p>
+ * Upon becoming a leader, the code loops in {@link 
#takeLeadership(CuratorFramework)}
+ * until it receives a deactivation signal from {@link #deactivate()}. This is 
synchronized
+ * using a {@link #lock} and condition variable {@link #cond}.
+ * <p>
+ * Access to the leadership status {@link #isLeader} is also protected by the 
{@link #lock}.
+ * This isn't strictly necessary and a volatile field would be sufficient, but 
since we
+ * already use the {@link #lock} this is more straightforward.
+ */
+@ThreadSafe
+public final class LeaderStatusMonitor
+      extends LeaderSelectorListenerAdapter implements AutoCloseable {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(LeaderStatusMonitor.class);
+
+  private static final String LEADER_SELECTOR_SUFFIX = "leader";
+
+  /** Unique instance of the singleton object */
+  private static LeaderStatusMonitor leaderStatusMonitor = null;
+
+  private final HAContext haContext;
+
+  /** Unique string describing this instance */
+  private final String defaultIncarnationId = generateIncarnationId();
+  private String incarnationId;
+
+  /** True when not using ZooKeeeper */
+  private final boolean isSingleNodeMode;
+
+  /** Lock and condition used to signal the leader to voluntary release 
leadership */
+  private final Lock lock  = new ReentrantLock();
+  /** Condition variable used to synchronize voluntary leadership release */
+  private final Condition cond = lock.newCondition();
+  /** Leadership status - true if leader. */
+  private boolean isLeader = false;
+
+  /** Curator framework leader monitor */
+  private LeaderSelector leaderSelector = null;
+
+  /** The number of times this incarnation has become the leader. */
+  private final AtomicLong leaderCount = new AtomicLong(0);
+
+  /**
+   * Constructor. Initialize state and create HA context if configuration
+   * specifies ZooKeeper servers.
+   * @param conf Configuration. The fields we are interested in are:
+   *             <ul>
+   *             <li>SENTRY_HA_ZOOKEEPER_QUORUM</li>
+   *             </ul>
+   *             Configuration is also passed to the
+   *             {@link HAContext#newLeaderSelector(String, 
LeaderSelectorListener)}
+   *             which uses more properties.
+   * @throws Exception
+   */
+
+  @VisibleForTesting
+  protected LeaderStatusMonitor(Configuration conf) throws Exception {
+    // Only enable HA configuration if zookeeper is configured
+    String zkServers = conf.get(SENTRY_HA_ZOOKEEPER_QUORUM, "");
+    if (zkServers.isEmpty()) {
+      isSingleNodeMode = true;
+      haContext = null;
+      isLeader = true;
+      incarnationId = "";
+      LOGGER.info("Leader election protocol disabled, assuming single active 
server");
+      return;
+    }
+    isSingleNodeMode = false;
+    incarnationId = defaultIncarnationId;
+    haContext = HAContext.getHAServerContext(conf);
+
+    LOGGER.info("Created LeaderStatusMonitor(incarnationId={}, "
+        + "zkServers='{}')", incarnationId, zkServers);
+  }
+
+  /**
+   * Tests may need to provide custm incarnation ID
+   * @param conf confguration
+   * @param incarnationId custom incarnation ID
+   * @throws Exception
+   */
+  @VisibleForTesting
+  protected LeaderStatusMonitor(Configuration conf, String incarnationId) 
throws Exception {
+    this(conf);
+    this.incarnationId = incarnationId;
+  }
+
+  /**
+   * Second half of the constructor links this object with {@link HAContext} 
and
+   * starts leader election protocol.
+   */
+  @VisibleForTesting
+  protected void init() {
+    if (isSingleNodeMode) {
+      return;
+    }
+
+    leaderSelector = haContext.newLeaderSelector("/" + LEADER_SELECTOR_SUFFIX, 
this);
+    leaderSelector.setId(incarnationId);
+    leaderSelector.autoRequeue();
+    leaderSelector.start();
+  }
+
+  /**
+   *
+   * @param conf Configuration. See {@link 
#LeaderStatusMonitor(Configuration)} for details.
+   * @return A global LeaderStatusMonitor instance.
+   * @throws Exception
+   */
+  @SuppressWarnings("LawOfDemeter")
+  public static synchronized LeaderStatusMonitor 
getLeaderStatusMonitor(Configuration conf)
+          throws Exception {
+    if (leaderStatusMonitor == null) {
+      leaderStatusMonitor = new LeaderStatusMonitor(conf);
+      leaderStatusMonitor.init();
+    }
+    return leaderStatusMonitor;
+  }
+
+  /**
+   * @return number of times this leader was elected. Used for metrics.
+   */
+  public long getLeaderCount() {
+    return leaderCount.get();
+  }
+
+  /**
+   * Shut down the LeaderStatusMonitor and wait for it to transition to
+   * standby.
+   */
+  @Override
+  public void close() {
+    if (leaderSelector != null) {
+      // Shut down our Curator hooks.
+      leaderSelector.close();
+    }
+  }
+
+  /**
+   * Deactivate the current client, if it is active.
+   * In non-HA case this is a no-op.
+   */
+  public void deactivate() {
+    if (isSingleNodeMode) {
+      return;
+    }
+    lock.lock();
+    try {
+      cond.signal();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * @return true iff we are the leader.
+   * In non-HA case always returns true
+   */
+  public boolean isLeader() {
+    if (isSingleNodeMode) {
+      return true;
+    }
+    lock.lock();
+    @SuppressWarnings("FieldAccessNotGuarded")
+    boolean leader = isLeader;
+    lock.unlock();
+    return leader;
+  }
+
+  /**
+   * Curator framework callback which is called when we become a leader.
+   * Should return only when we decide to resign.
+   */
+  @Override
+  public void takeLeadership(CuratorFramework client) throws Exception {
+    leaderCount.incrementAndGet();
+    LOGGER.info("Becoming leader in Sentry HA cluster:{}", this);
+    lock.lock();
+    try {
+      isLeader = true;
+      // Wait until we are interrupted or receive a signal
+      cond.await();
+    } catch (InterruptedException ignored) {
+      Thread.currentThread().interrupt();
+      LOGGER.error("takeLeadership call interrupted:" + this, ignored);
+    } finally {
+      isLeader = false;
+      lock.unlock();
+      LOGGER.info("Resigning from leader status in a Sentry HA cluster:{}", 
this);
+    }
+  }
+
+  /**
+   * Generate ID for the activator. <p>
+   *
+   * Ideally we would like something like host@pid, but Java doesn't provide a 
good
+   * way to determine pid value, so we use
+   * {@link RuntimeMXBean#getName()} which usually contains host
+   * name and pid.
+   */
+  private static String generateIncarnationId() {
+    return ManagementFactory.getRuntimeMXBean().getName();
+  }
+
+  @Override
+  public String toString() {
+    return isSingleNodeMode?"Leader election disabled":
+        String.format("{isSingleNodeMode=%b, incarnationId=%s, isLeader=%b, 
leaderCount=%d}",
+        isSingleNodeMode, incarnationId, isLeader, leaderCount);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/c4226f64/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/NotificationProcessor.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/NotificationProcessor.java
 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/NotificationProcessor.java
new file mode 100644
index 0000000..e755837
--- /dev/null
+++ 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/NotificationProcessor.java
@@ -0,0 +1,773 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.provider.db.service.persistent;
+
+import com.codahale.metrics.Timer;
+import com.codahale.metrics.Timer.Context;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
+import 
org.apache.sentry.binding.metastore.messaging.json.SentryJSONAddPartitionMessage;
+import 
org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterPartitionMessage;
+import 
org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterTableMessage;
+import 
org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateDatabaseMessage;
+import 
org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateTableMessage;
+import 
org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropDatabaseMessage;
+import 
org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropPartitionMessage;
+import 
org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropTableMessage;
+import 
org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageDeserializer;
+import org.apache.sentry.core.common.exception.SentryInvalidHMSEventException;
+import org.apache.sentry.core.common.exception.SentryInvalidInputException;
+import org.apache.sentry.core.common.exception.SentryNoSuchObjectException;
+import org.apache.sentry.core.common.utils.PathUtils;
+import org.apache.sentry.hdfs.PathsUpdate;
+import org.apache.sentry.hdfs.PermissionsUpdate;
+import org.apache.sentry.hdfs.SentryMalformedPathException;
+import org.apache.sentry.hdfs.UniquePathsUpdate;
+import org.apache.sentry.hdfs.Updateable.Update;
+import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges;
+import org.apache.sentry.provider.db.service.thrift.SentryMetrics;
+import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable;
+import org.apache.sentry.service.thrift.SentryServiceUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import static com.codahale.metrics.MetricRegistry.name;
+import static 
org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_CREATE_WITH_POLICY_STORE;
+import static 
org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_DROP_WITH_POLICY_STORE;
+
+
+
+/**
+ * NotificationProcessor processes various notification events generated from
+ * the Hive MetaStore state change, and applies these changes to the complete
+ * HMS Paths snapshot or delta update stored in Sentry using SentryStore.
+ *
+ * <p>NotificationProcessor should not skip processing notification events for 
any reason.
+ * If some notification events are to be skipped, appropriate logic should be 
added in
+ * HMSFollower before invoking NotificationProcessor.
+ */
+final class NotificationProcessor {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(NotificationProcessor.class);
+  private final SentryStore sentryStore;
+  private final SentryJSONMessageDeserializer deserializer;
+  private final String authServerName;
+  // These variables can be updated even after object is instantiated, for 
testing purposes.
+  private boolean syncStoreOnCreate = false;
+  private boolean syncStoreOnDrop = false;
+  private final boolean hdfsSyncEnabled;
+
+  /**
+   * Configuring notification processor.
+   *
+   * @param sentryStore sentry backend store
+   * @param authServerName Server that sentry is authorizing
+   * @param conf sentry configuration
+   */
+  NotificationProcessor(SentryStore sentryStore, String authServerName,
+      Configuration conf) {
+    this.sentryStore = sentryStore;
+    deserializer = new SentryJSONMessageDeserializer();
+    this.authServerName = authServerName;
+    syncStoreOnCreate = Boolean
+        .parseBoolean(conf.get(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE.getVar(),
+            AUTHZ_SYNC_CREATE_WITH_POLICY_STORE.getDefault()));
+    syncStoreOnDrop = 
Boolean.parseBoolean(conf.get(AUTHZ_SYNC_DROP_WITH_POLICY_STORE.getVar(),
+        AUTHZ_SYNC_DROP_WITH_POLICY_STORE.getDefault()));
+    hdfsSyncEnabled = SentryServiceUtil.isHDFSSyncEnabled(conf);
+  }
+
+  /**
+   * Split path into components on the "/" character.
+   * The path should not start with "/".
+   * This is consumed by Thrift interface, so the return result should be
+   * {@code List<String>}
+   *
+   * @param path input oath e.g. {@code foo/bar}
+   * @return list of components, e.g. [foo, bar]
+   */
+  private static List<String> splitPath(String path) {
+    return Lists.newArrayList(PathUtils.splitPath(path));
+  }
+
+  /**
+   * Constructs permission update to be persisted for drop event that can be 
persisted
+   * from thrift object.
+   *
+   * @param authorizable thrift object that is dropped.
+   * @return update to be persisted
+   * @throws SentryInvalidInputException if the required fields are set in 
argument provided
+   */
+  @VisibleForTesting
+  static Update getPermUpdatableOnDrop(TSentryAuthorizable authorizable)
+      throws SentryInvalidInputException {
+    PermissionsUpdate update = new 
PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false);
+    String authzObj = SentryServiceUtil.getAuthzObj(authorizable);
+    update.addPrivilegeUpdate(authzObj)
+        .putToDelPrivileges(PermissionsUpdate.ALL_ROLES, 
PermissionsUpdate.ALL_ROLES);
+    return update;
+  }
+
+  @VisibleForTesting
+  String getAuthServerName() {
+    return authServerName;
+  }
+
+  /**
+   * Constructs permission update to be persisted for rename event that can be 
persisted from thrift
+   * object.
+   *
+   * @param oldAuthorizable old thrift object
+   * @param newAuthorizable new thrift object
+   * @return update to be persisted
+   * @throws SentryInvalidInputException if the required fields are set in 
arguments provided
+   */
+  @VisibleForTesting
+  static Update getPermUpdatableOnRename(TSentryAuthorizable oldAuthorizable,
+      TSentryAuthorizable newAuthorizable)
+      throws SentryInvalidInputException {
+    String oldAuthz = SentryServiceUtil.getAuthzObj(oldAuthorizable);
+    String newAuthz = SentryServiceUtil.getAuthzObj(newAuthorizable);
+    PermissionsUpdate update = new 
PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false);
+    TPrivilegeChanges privUpdate = 
update.addPrivilegeUpdate(PermissionsUpdate.RENAME_PRIVS);
+    privUpdate.putToAddPrivileges(newAuthz, newAuthz);
+    privUpdate.putToDelPrivileges(oldAuthz, oldAuthz);
+    return update;
+  }
+
+  /**
+   * This function is only used for testing purposes.
+   *
+   * @param value to be set
+   */
+  @SuppressWarnings("SameParameterValue")
+  @VisibleForTesting
+  void setSyncStoreOnCreate(boolean value) {
+    syncStoreOnCreate = value;
+  }
+
+  /**
+   * This function is only used for testing purposes.
+   *
+   * @param value to be set
+   */
+  @SuppressWarnings("SameParameterValue")
+  @VisibleForTesting
+  void setSyncStoreOnDrop(boolean value) {
+    syncStoreOnDrop = value;
+  }
+
+  /**
+   * Processes the event and persist to sentry store.
+   *
+   * @param event to be processed
+   * @return true, if the event is persisted to sentry store. false, if the 
event is not persisted.
+   * @throws Exception if there is an error processing the event.
+   */
+  boolean processNotificationEvent(NotificationEvent event) throws Exception {
+    LOGGER
+        .debug("Processing event with id:{} and Type:{}", event.getEventId(), 
event.getEventType());
+
+    // Expose time used for each request time as a metric.
+    // We use lower-case version of the event name.
+    EventType eventType = EventType.valueOf(event.getEventType());
+    Timer timer = SentryMetrics
+        .getInstance()
+        .getTimer(name(HMSFollower.class, eventType.toString().toLowerCase()));
+
+    try (Context ignored = timer.time()) {
+      switch (eventType) {
+        case CREATE_DATABASE:
+          return processCreateDatabase(event);
+        case DROP_DATABASE:
+          return processDropDatabase(event);
+        case CREATE_TABLE:
+          return processCreateTable(event);
+        case DROP_TABLE:
+          return processDropTable(event);
+        case ALTER_TABLE:
+          return processAlterTable(event);
+        case ADD_PARTITION:
+          return processAddPartition(event);
+        case DROP_PARTITION:
+          return processDropPartition(event);
+        case ALTER_PARTITION:
+          return processAlterPartition(event);
+        default:
+          LOGGER.error("Notification with ID:{} has invalid event type: {}", 
event.getEventId(),
+              event.getEventType());
+          return false;
+      }
+    }
+  }
+
+  /**
+   * Processes "create database" notification event, and applies its 
corresponding
+   * snapshot change as well as delta path update into Sentry DB.
+   *
+   * @param event notification event to be processed.
+   * @throws Exception if encounters errors while persisting the path change
+   */
+  private boolean processCreateDatabase(NotificationEvent event) throws 
Exception {
+    SentryJSONCreateDatabaseMessage message =
+        deserializer.getCreateDatabaseMessage(event.getMessage());
+    String dbName = message.getDB();
+    String location = message.getLocation();
+    if ((dbName == null) || (location == null)) {
+      LOGGER.error("Create database event "
+              + "has incomplete information. dbName: {} location: {}",
+          StringUtils.defaultIfBlank(dbName, "null"),
+          StringUtils.defaultIfBlank(location, "null"));
+      return false;
+    }
+
+    if (syncStoreOnCreate) {
+      dropSentryDbPrivileges(dbName, event);
+    }
+
+    if (hdfsSyncEnabled) {
+      List<String> locations = Collections.singletonList(location);
+      addPaths(dbName, locations, event);
+
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
+   * Processes "drop database" notification event, and applies its 
corresponding
+   * snapshot change as well as delta path update into Sentry DB.
+   *
+   * @param event notification event to be processed.
+   * @throws Exception if encounters errors while persisting the path change
+   */
+  private boolean processDropDatabase(NotificationEvent event) throws 
Exception {
+    SentryJSONDropDatabaseMessage dropDatabaseMessage =
+        deserializer.getDropDatabaseMessage(event.getMessage());
+    String dbName = dropDatabaseMessage.getDB();
+    String location = dropDatabaseMessage.getLocation();
+    if (dbName == null) {
+      LOGGER.error("Drop database event has incomplete information: dbName = 
null");
+      return false;
+    }
+    if (syncStoreOnDrop) {
+      dropSentryDbPrivileges(dbName, event);
+    }
+
+    if (hdfsSyncEnabled) {
+      List<String> locations = Collections.singletonList(location);
+      removePaths(dbName, locations, event);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Processes "create table" notification event, and applies its corresponding
+   * snapshot change as well as delta path update into Sentry DB.
+   *
+   * @param event notification event to be processed.
+   * @throws Exception if encounters errors while persisting the path change
+   */
+  private boolean processCreateTable(NotificationEvent event)
+      throws Exception {
+    SentryJSONCreateTableMessage createTableMessage = deserializer
+        .getCreateTableMessage(event.getMessage());
+    String dbName = createTableMessage.getDB();
+    String tableName = createTableMessage.getTable();
+    String location = createTableMessage.getLocation();
+    if ((dbName == null) || (tableName == null) || (location == null)) {
+      LOGGER.error(String.format("Create table event " + "has incomplete 
information."
+              + " dbName = %s, tableName = %s, location = %s",
+          StringUtils.defaultIfBlank(dbName, "null"),
+          StringUtils.defaultIfBlank(tableName, "null"),
+          StringUtils.defaultIfBlank(location, "null")));
+      return false;
+    }
+    if (syncStoreOnCreate) {
+      dropSentryTablePrivileges(dbName, tableName, event);
+    }
+
+    if (hdfsSyncEnabled) {
+      String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName);
+      List<String> locations = Collections.singletonList(location);
+      addPaths(authzObj, locations, event);
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
+   * Processes "drop table" notification event. It drops all partitions 
belongs to
+   * the table as well. And applies its corresponding snapshot change as well
+   * as delta path update into Sentry DB.
+   *
+   * @param event notification event to be processed.
+   * @throws Exception if encounters errors while persisting the path change
+   */
+  private boolean processDropTable(NotificationEvent event) throws Exception {
+    SentryJSONDropTableMessage dropTableMessage = deserializer
+        .getDropTableMessage(event.getMessage());
+    String dbName = dropTableMessage.getDB();
+    String tableName = dropTableMessage.getTable();
+    if ((dbName == null) || (tableName == null)) {
+      LOGGER.error("Drop table event "
+          + "has incomplete information. dbName: {}, tableName: {}",
+          StringUtils.defaultIfBlank(dbName, "null"),
+          StringUtils.defaultIfBlank(tableName, "null"));
+      return false;
+    }
+    if (syncStoreOnDrop) {
+      dropSentryTablePrivileges(dbName, tableName, event);
+    }
+
+    if (hdfsSyncEnabled) {
+      String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName);
+      removeAllPaths(authzObj, event);
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
+   * Processes "alter table" notification event, and applies its corresponding
+   * snapshot change as well as delta path update into Sentry DB.
+   *
+   * @param event notification event to be processed.
+   * @throws Exception if encounters errors while persisting the path change
+   */
+  private boolean processAlterTable(NotificationEvent event) throws Exception {
+
+    if (!hdfsSyncEnabled) {
+      return false;
+    }
+
+    SentryJSONAlterTableMessage alterTableMessage =
+        deserializer.getAlterTableMessage(event.getMessage());
+    String oldDbName = alterTableMessage.getDB();
+    String oldTableName = alterTableMessage.getTable();
+    String newDbName = event.getDbName();
+    String newTableName = event.getTableName();
+    String oldLocation = alterTableMessage.getOldLocation();
+    String newLocation = alterTableMessage.getNewLocation();
+
+    if ((oldDbName == null)
+        || (oldTableName == null)
+        || (newDbName == null)
+        || (newTableName == null)
+        || (oldLocation == null)
+        || (newLocation == null)) {
+      LOGGER.error(String.format("Alter table event "
+              + "has incomplete information. oldDbName = %s, oldTableName = 
%s, oldLocation = %s, "
+              + "newDbName = %s, newTableName = %s, newLocation = %s",
+          StringUtils.defaultIfBlank(oldDbName, "null"),
+          StringUtils.defaultIfBlank(oldTableName, "null"),
+          StringUtils.defaultIfBlank(oldLocation, "null"),
+          StringUtils.defaultIfBlank(newDbName, "null"),
+          StringUtils.defaultIfBlank(newTableName, "null"),
+          StringUtils.defaultIfBlank(newLocation, "null")));
+      return false;
+    }
+
+    if ((oldDbName.equals(newDbName))
+        && (oldTableName.equals(newTableName))
+        && (oldLocation.equals(newLocation))) {
+      LOGGER.error(String.format("Alter table notification ignored as neither 
name nor "
+              + "location has changed: oldAuthzObj = %s, oldLocation = %s, 
newAuthzObj = %s, "
+              + "newLocation = %s", oldDbName + "." + oldTableName, 
oldLocation,
+          newDbName + "." + newTableName, newLocation));
+      return false;
+    }
+
+    if (!newDbName.equalsIgnoreCase(oldDbName) || 
!oldTableName.equalsIgnoreCase(newTableName)) {
+      // Name has changed
+      try {
+        renamePrivileges(oldDbName, oldTableName, newDbName, newTableName);
+      } catch (SentryNoSuchObjectException e) {
+        LOGGER.info("Rename Sentry privilege ignored as there are no 
privileges on the table:"
+            + " {}.{}", oldDbName, oldTableName);
+      } catch (Exception e) {
+        LOGGER.info("Could not process Alter table event. Event: {}", 
event.toString(), e);
+        return false;
+      }
+    }
+    String oldAuthzObj = oldDbName + "." + oldTableName;
+    String newAuthzObj = newDbName + "." + newTableName;
+    renameAuthzPath(oldAuthzObj, newAuthzObj, oldLocation, newLocation, event);
+    return true;
+  }
+
+  /**
+   * Processes "add partition" notification event, and applies its 
corresponding
+   * snapshot change as well as delta path update into Sentry DB.
+   *
+   * @param event notification event to be processed.
+   * @throws Exception if encounters errors while persisting the path change
+   */
+  private boolean processAddPartition(NotificationEvent event)
+      throws Exception {
+    if (!hdfsSyncEnabled) {
+      return false;
+    }
+
+    SentryJSONAddPartitionMessage addPartitionMessage =
+        deserializer.getAddPartitionMessage(event.getMessage());
+    String dbName = addPartitionMessage.getDB();
+    String tableName = addPartitionMessage.getTable();
+    List<String> locations = addPartitionMessage.getLocations();
+    if ((dbName == null) || (tableName == null) || (locations == null)) {
+      LOGGER.error(String.format("Create table event has incomplete 
information. "
+              + "dbName = %s, tableName = %s, locations = %s",
+          StringUtils.defaultIfBlank(dbName, "null"),
+          StringUtils.defaultIfBlank(tableName, "null"),
+          locations != null ? locations.toString() : "null"));
+      return false;
+    }
+    String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName);
+    addPaths(authzObj, locations, event);
+    return true;
+  }
+
+  /**
+   * Processes "drop partition" notification event, and applies its 
corresponding
+   * snapshot change as well as delta path update into Sentry DB.
+   *
+   * @param event notification event to be processed.
+   * @throws Exception if encounters errors while persisting the path change
+   */
+  private boolean processDropPartition(NotificationEvent event)
+      throws Exception {
+    if (!hdfsSyncEnabled) {
+      return false;
+    }
+
+    SentryJSONDropPartitionMessage dropPartitionMessage =
+        deserializer.getDropPartitionMessage(event.getMessage());
+    String dbName = dropPartitionMessage.getDB();
+    String tableName = dropPartitionMessage.getTable();
+    List<String> locations = dropPartitionMessage.getLocations();
+    if ((dbName == null) || (tableName == null) || (locations == null)) {
+      LOGGER.error(String.format("Drop partition event "
+              + "has incomplete information. dbName = %s, tableName = %s, 
location = %s",
+          StringUtils.defaultIfBlank(dbName, "null"),
+          StringUtils.defaultIfBlank(tableName, "null"),
+          locations != null ? locations.toString() : "null"));
+      return false;
+    }
+    String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName);
+    removePaths(authzObj, locations, event);
+    return true;
+  }
+
+  /**
+   * Processes "alter partition" notification event, and applies its 
corresponding
+   * snapshot change as well as delta path update into Sentry DB.
+   *
+   * @param event notification event to be processed.
+   * @throws Exception if encounters errors while persisting the path change
+   */
+  private boolean processAlterPartition(NotificationEvent event) throws 
Exception {
+    if (!hdfsSyncEnabled) {
+      return false;
+    }
+
+    SentryJSONAlterPartitionMessage alterPartitionMessage =
+        deserializer.getAlterPartitionMessage(event.getMessage());
+    String dbName = alterPartitionMessage.getDB();
+    String tableName = alterPartitionMessage.getTable();
+    String oldLocation = alterPartitionMessage.getOldLocation();
+    String newLocation = alterPartitionMessage.getNewLocation();
+
+    if ((dbName == null)
+        || (tableName == null)
+        || (oldLocation == null)
+        || (newLocation == null)) {
+      LOGGER.error(String.format("Alter partition event "
+              + "has incomplete information. dbName = %s, tableName = %s, "
+              + "oldLocation = %s, newLocation = %s",
+          StringUtils.defaultIfBlank(dbName, "null"),
+          StringUtils.defaultIfBlank(tableName, "null"),
+          StringUtils.defaultIfBlank(oldLocation, "null"),
+          StringUtils.defaultIfBlank(newLocation, "null")));
+      return false;
+    }
+
+    if (oldLocation.equals(newLocation)) {
+      LOGGER.debug(String.format("Alter partition notification ignored as"
+          + "location has not changed: AuthzObj = %s, Location = %s", dbName + 
"."
+          + "." + tableName, oldLocation));
+      return false;
+    }
+
+    String oldAuthzObj = dbName + "." + tableName;
+    renameAuthzPath(oldAuthzObj, oldAuthzObj, oldLocation, newLocation, event);
+    return true;
+  }
+
+  /**
+   * Adds an authzObj along with a set of paths into the authzObj -> [Paths] 
mapping
+   * as well as persist the corresponding delta path change to Sentry DB.
+   *
+   * @param authzObj the given authzObj
+   * @param locations a set of paths need to be added
+   * @param event the NotificationEvent object from where authzObj and 
locations were obtained
+   */
+  private void addPaths(String authzObj, Collection<String> locations, 
NotificationEvent event)
+      throws Exception {
+    // AuthzObj is case insensitive
+    authzObj = authzObj.toLowerCase();
+
+    UniquePathsUpdate update = new UniquePathsUpdate(event, false);
+    Collection<String> paths = new HashSet<>(locations.size());
+    // addPath and persist into Sentry DB.
+    // Skip update if encounter malformed path.
+    for (String location : locations) {
+      String pathTree = getPath(location);
+      if (pathTree == null) {
+        LOGGER.debug("HMS Path Update ["
+            + "OP : addPath, "
+            + "authzObj : " + authzObj + ", "
+            + "path : " + location + "] - nothing to add" + ", "
+            + "notification event ID: " + event.getEventId() + "]");
+      } else {
+        LOGGER.debug("HMS Path Update ["
+            + "OP : addPath, " + "authzObj : "
+            + authzObj + ", "
+            + "path : " + location + ", "
+            + "notification event ID: " + event.getEventId() + "]");
+        update.newPathChange(authzObj).addToAddPaths(splitPath(pathTree));
+        paths.add(pathTree);
+      }
+    }
+    sentryStore.addAuthzPathsMapping(authzObj, paths, update);
+  }
+
+  /**
+   * Removes a set of paths map to a given authzObj from the authzObj -> 
[Paths] mapping
+   * as well as persist the corresponding delta path change to Sentry DB.
+   *
+   * @param authzObj the given authzObj
+   * @param locations a set of paths need to be removed
+   * @param event the NotificationEvent object from where authzObj and 
locations were obtained
+   */
+  private void removePaths(String authzObj, Collection<String> locations, 
NotificationEvent event)
+      throws Exception {
+    // AuthzObj is case insensitive
+    authzObj = authzObj.toLowerCase();
+
+    UniquePathsUpdate update = new UniquePathsUpdate(event, false);
+    Collection<String> paths = new HashSet<>(locations.size());
+    for (String location : locations) {
+      String pathTree = getPath(location);
+      if (pathTree == null) {
+        LOGGER.debug("HMS Path Update ["
+            + "OP : removePath, "
+            + "authzObj : " + authzObj + ", "
+            + "path : " + location + "] - nothing to remove" + ", "
+            + "notification event ID: " + event.getEventId() + "]");
+      } else {
+        LOGGER.debug("HMS Path Update ["
+            + "OP : removePath, "
+            + "authzObj : " + authzObj + ", "
+            + "path : " + location + ", "
+            + "notification event ID: " + event.getEventId() + "]");
+        update.newPathChange(authzObj).addToDelPaths(splitPath(pathTree));
+        paths.add(pathTree);
+      }
+    }
+    sentryStore.deleteAuthzPathsMapping(authzObj, paths, update);
+  }
+
+  /**
+   * Removes a given authzObj and all paths belongs to it from the
+   * authzObj -> [Paths] mapping as well as persist the corresponding
+   * delta path change to Sentry DB.
+   *
+   * @param authzObj the given authzObj to be deleted
+   * @param event the NotificationEvent object from where authzObj and 
locations were obtained
+   */
+  private void removeAllPaths(String authzObj, NotificationEvent event)
+      throws Exception {
+    // AuthzObj is case insensitive
+    authzObj = authzObj.toLowerCase();
+
+    LOGGER.debug("HMS Path Update ["
+        + "OP : removeAllPaths, "
+        + "authzObj : " + authzObj + ", "
+        + "notification event ID: " + event.getEventId() + "]");
+    UniquePathsUpdate update = new UniquePathsUpdate(event, false);
+    update.newPathChange(authzObj).addToDelPaths(
+        Lists.newArrayList(PathsUpdate.ALL_PATHS));
+    sentryStore.deleteAllAuthzPathsMapping(authzObj, update);
+  }
+
+  /**
+   * Renames a given authzObj and alter the paths belongs to it from the
+   * authzObj -> [Paths] mapping as well as persist the corresponding
+   * delta path change to Sentry DB.
+   *
+   * @param oldAuthzObj the existing authzObj
+   * @param newAuthzObj the new name to be changed to
+   * @param oldLocation a existing path of the given authzObj
+   * @param newLocation a new path to be changed to
+   * @param event the NotificationEvent object from where authzObj and 
locations were obtained
+   */
+  private void renameAuthzPath(String oldAuthzObj, String newAuthzObj, String 
oldLocation,
+      String newLocation, NotificationEvent event) throws Exception {
+    // AuthzObj is case insensitive
+    oldAuthzObj = oldAuthzObj.toLowerCase();
+    newAuthzObj = newAuthzObj.toLowerCase();
+    String oldPathTree = getPath(oldLocation);
+    String newPathTree = getPath(newLocation);
+
+    LOGGER.debug("HMS Path Update ["
+        + "OP : renameAuthzObject, "
+        + "oldAuthzObj : " + oldAuthzObj + ", "
+        + "newAuthzObj : " + newAuthzObj + ", "
+        + "oldLocation : " + oldLocation + ", "
+        + "newLocation : " + newLocation + ", "
+        + "notification event ID: " + event.getEventId() + "]");
+
+    // In the case of HiveObj name has changed
+    if (!oldAuthzObj.equalsIgnoreCase(newAuthzObj)) {
+      // Skip update if encounter malformed path for both oldLocation and 
newLocation.
+      if ((oldPathTree != null) && (newPathTree != null)) {
+        UniquePathsUpdate update = new UniquePathsUpdate(event, false);
+        
update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree));
+        
update.newPathChange(newAuthzObj).addToAddPaths(splitPath(newPathTree));
+        if (oldLocation.equals(newLocation)) {
+          //Only name has changed
+          // - Alter table rename for an external table
+          sentryStore.renameAuthzObj(oldAuthzObj, newAuthzObj, update);
+        } else {
+          // Both name and location has changed
+          // - Alter table rename for managed table
+          sentryStore.renameAuthzPathsMapping(oldAuthzObj, newAuthzObj, 
oldPathTree,
+              newPathTree, update);
+        }
+      } else {
+        updateAuthzPathsMapping(oldAuthzObj, oldPathTree, newAuthzObj, 
newPathTree, event);
+      }
+    } else if (!oldLocation.equals(newLocation)) {
+      // Only Location has changed, e.g. Alter table set location
+      if ((oldPathTree != null) && (newPathTree != null)) {
+        UniquePathsUpdate update = new UniquePathsUpdate(event, false);
+        
update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree));
+        
update.newPathChange(oldAuthzObj).addToAddPaths(splitPath(newPathTree));
+        sentryStore.updateAuthzPathsMapping(oldAuthzObj, oldPathTree,
+            newPathTree, update);
+      } else {
+        updateAuthzPathsMapping(oldAuthzObj, oldPathTree, newAuthzObj, 
newPathTree,event);
+      }
+    } else {
+      LOGGER.error("Update Notification for Auhorizable object {}, with no 
change, skipping",
+          oldAuthzObj);
+      throw new SentryInvalidHMSEventException("Update Notification for 
Authorizable object"
+          + "with no change");
+    }
+  }
+
+  private void updateAuthzPathsMapping(String oldAuthzObj, String oldPathTree,
+      String newAuthzObj, String newPathTree, NotificationEvent event) throws 
Exception {
+    if (oldPathTree != null) {
+      UniquePathsUpdate update = new UniquePathsUpdate(event, false);
+      update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree));
+      sentryStore.deleteAuthzPathsMapping(oldAuthzObj,
+          Collections.singleton(oldPathTree),
+          update);
+    } else if (newPathTree != null) {
+      UniquePathsUpdate update = new UniquePathsUpdate(event, false);
+      update.newPathChange(newAuthzObj).addToAddPaths(splitPath(newPathTree));
+      sentryStore.addAuthzPathsMapping(newAuthzObj,
+          Collections.singleton(newPathTree),
+          update);
+    }
+
+  }
+
+  /**
+   * Get path tree from a given path. It return null if encounters
+   * SentryMalformedPathException which indicates a malformed path.
+   *
+   * @param path a path
+   * @return the path tree given a path.
+   */
+  private String getPath(String path) {
+    try {
+      return PathsUpdate.parsePath(path);
+    } catch (SentryMalformedPathException e) {
+      LOGGER.error("Unexpected path while parsing {}", path, e);
+    }
+    return null;
+  }
+
+  private void dropSentryDbPrivileges(String dbName, NotificationEvent event) {
+    try {
+      TSentryAuthorizable authorizable = new 
TSentryAuthorizable(authServerName);
+      authorizable.setDb(dbName);
+      sentryStore.dropPrivilege(authorizable, 
getPermUpdatableOnDrop(authorizable));
+    } catch (SentryNoSuchObjectException e) {
+      LOGGER.info("Drop Sentry privilege ignored as there are no privileges on 
the database: {}",
+          dbName);
+    } catch (Exception e) {
+      LOGGER.error("Could not process Drop database event." + "Event: " + 
event.toString(), e);
+    }
+  }
+
+  private void dropSentryTablePrivileges(String dbName, String tableName,
+      NotificationEvent event) {
+    try {
+      TSentryAuthorizable authorizable = new 
TSentryAuthorizable(authServerName);
+      authorizable.setDb(dbName);
+      authorizable.setTable(tableName);
+      sentryStore.dropPrivilege(authorizable, 
getPermUpdatableOnDrop(authorizable));
+    } catch (SentryNoSuchObjectException e) {
+      LOGGER.info("Drop Sentry privilege ignored as there are no privileges on 
the table: {}.{}",
+          dbName, tableName);
+    } catch (Exception e) {
+      LOGGER.error("Could not process Drop table event. Event: " + 
event.toString(), e);
+    }
+  }
+
+  private void renamePrivileges(String oldDbName, String oldTableName, String 
newDbName,
+      String newTableName) throws
+      Exception {
+    TSentryAuthorizable oldAuthorizable = new 
TSentryAuthorizable(authServerName);
+    oldAuthorizable.setDb(oldDbName);
+    oldAuthorizable.setTable(oldTableName);
+    TSentryAuthorizable newAuthorizable = new 
TSentryAuthorizable(authServerName);
+    newAuthorizable.setDb(newDbName);
+    newAuthorizable.setTable(newTableName);
+    Update update =
+        getPermUpdatableOnRename(oldAuthorizable, newAuthorizable);
+    sentryStore.renamePrivilege(oldAuthorizable, newAuthorizable, update);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/c4226f64/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
index 6c4631f..edea5b6 100644
--- 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
+++ 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
@@ -78,7 +78,6 @@ import 
org.apache.sentry.provider.db.service.thrift.TSentryMappingData;
 import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege;
 import org.apache.sentry.provider.db.service.thrift.TSentryPrivilegeMap;
 import org.apache.sentry.provider.db.service.thrift.TSentryRole;
-import org.apache.sentry.service.thrift.CounterWait;
 import org.apache.sentry.service.thrift.ServiceConstants.PrivilegeScope;
 import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
 import org.datanucleus.store.rdbms.exceptions.MissingTableException;

Reply via email to