http://git-wip-us.apache.org/repos/asf/sentry/blob/4cdb506c/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java
 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java
index 6762de7..3a5ba46 100644
--- 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java
+++ 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java
@@ -15,82 +15,275 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.sentry.service.thrift;
 
+import static 
org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_CREATE_WITH_POLICY_STORE;
+import static 
org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_DROP_WITH_POLICY_STORE;
+
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hive.hcatalog.messaging.HCatEventMessage;
+import 
org.apache.sentry.binding.metastore.messaging.json.SentryJSONAddPartitionMessage;
+import 
org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterPartitionMessage;
+import 
org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterTableMessage;
+import 
org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateDatabaseMessage;
+import 
org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateTableMessage;
+import 
org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropDatabaseMessage;
+import 
org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropPartitionMessage;
+import 
org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropTableMessage;
+import 
org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageDeserializer;
+import org.apache.sentry.core.common.exception.SentryInvalidHMSEventException;
+import org.apache.sentry.core.common.exception.SentryInvalidInputException;
+import org.apache.sentry.core.common.exception.SentryNoSuchObjectException;
 import org.apache.sentry.hdfs.PathsUpdate;
+import org.apache.sentry.hdfs.PermissionsUpdate;
 import org.apache.sentry.hdfs.SentryMalformedPathException;
-import org.apache.sentry.core.common.exception.SentryInvalidHMSEventException;
+import org.apache.sentry.hdfs.Updateable.Update;
+import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges;
 import org.apache.sentry.provider.db.service.persistent.SentryStore;
+import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
 
 /**
  * NotificationProcessor processes various notification events generated from
- * the Hive MetaStore state change, and applies these changes on the complete
+ * the Hive MetaStore state change, and applies these changes to the complete
  * HMS Paths snapshot or delta update stored in Sentry using SentryStore.
- * <p>
- * NotificationProcessor should not skip processing notification events for 
any reason.
+ *
+ * <p>NotificationProcessor should not skip processing notification events for 
any reason.
  * If some notification events are to be skipped, appropriate logic should be 
added in
  * HMSFollower before invoking NotificationProcessor.
  */
-class NotificationProcessor {
+final class NotificationProcessor {
 
-  private final Logger LOGGER;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(NotificationProcessor.class);
   private final SentryStore sentryStore;
+  private final SentryJSONMessageDeserializer deserializer;
+  private final String authServerName;
+  // These variables can be updated even after object is instantiated, for 
testing purposes.
+  private boolean syncStoreOnCreate = false;
+  private boolean syncStoreOnDrop = false;
 
-  NotificationProcessor(SentryStore sentryStore, Logger LOGGER) {
-    this.LOGGER = LOGGER;
+  /**
+   * Configuring notification processor.
+   *
+   * @param sentryStore sentry backend store
+   * @param authServerName Server that sentry is authorizing
+   * @param conf sentry configuration
+   */
+  NotificationProcessor(SentryStore sentryStore, String authServerName,
+      Configuration conf) {
     this.sentryStore = sentryStore;
+    deserializer = new SentryJSONMessageDeserializer();
+    this.authServerName = authServerName;
+    syncStoreOnCreate = Boolean
+        .parseBoolean(conf.get(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE.getVar(),
+            AUTHZ_SYNC_CREATE_WITH_POLICY_STORE.getDefault()));
+    syncStoreOnDrop = 
Boolean.parseBoolean(conf.get(AUTHZ_SYNC_DROP_WITH_POLICY_STORE.getVar(),
+        AUTHZ_SYNC_DROP_WITH_POLICY_STORE.getDefault()));
+  }
+
+  /**
+   * Split path into components on the "/" character.
+   * The path should not start with "/".
+   * This is consumed by Thrift interface, so the return result should be
+   * {@code List<String>}
+   *
+   * @param path input oath e.g. {@code foo/bar}
+   * @return list of components, e.g. [foo, bar]
+   */
+  private static List<String> splitPath(String path) {
+    return (Lists.newArrayList(path.split("/")));
+  }
+
+  /**
+   * Constructs permission update to be persisted for drop event that can be 
persisted
+   * from thrift object.
+   *
+   * @param authorizable thrift object that is dropped.
+   * @return update to be persisted
+   * @throws SentryInvalidInputException if the required fields are set in 
argument provided
+   */
+  @VisibleForTesting
+  static Update getPermUpdatableOnDrop(TSentryAuthorizable authorizable)
+      throws SentryInvalidInputException {
+    PermissionsUpdate update = new 
PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false);
+    String authzObj = SentryServiceUtil.getAuthzObj(authorizable);
+    update.addPrivilegeUpdate(authzObj)
+        .putToDelPrivileges(PermissionsUpdate.ALL_ROLES, 
PermissionsUpdate.ALL_ROLES);
+    return update;
+  }
+
+  /**
+   * Constructs permission update to be persisted for rename event that can be 
persisted from thrift
+   * object.
+   *
+   * @param oldAuthorizable old thrift object
+   * @param newAuthorizable new thrift object
+   * @return update to be persisted
+   * @throws SentryInvalidInputException if the required fields are set in 
arguments provided
+   */
+  @VisibleForTesting
+  static Update getPermUpdatableOnRename(TSentryAuthorizable oldAuthorizable,
+      TSentryAuthorizable newAuthorizable)
+      throws SentryInvalidInputException {
+    String oldAuthz = SentryServiceUtil.getAuthzObj(oldAuthorizable);
+    String newAuthz = SentryServiceUtil.getAuthzObj(newAuthorizable);
+    PermissionsUpdate update = new 
PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false);
+    TPrivilegeChanges privUpdate = 
update.addPrivilegeUpdate(PermissionsUpdate.RENAME_PRIVS);
+    privUpdate.putToAddPrivileges(newAuthz, newAuthz);
+    privUpdate.putToDelPrivileges(oldAuthz, oldAuthz);
+    return update;
+  }
+
+  /**
+   * This function is only used for testing purposes.
+   *
+   * @param value to be set
+   */
+  @SuppressWarnings("SameParameterValue")
+  @VisibleForTesting
+  void setSyncStoreOnCreate(boolean value) {
+    syncStoreOnCreate = value;
+  }
+
+  /**
+   * This function is only used for testing purposes.
+   *
+   * @param value to be set
+   */
+  @SuppressWarnings("SameParameterValue")
+  @VisibleForTesting
+  void setSyncStoreOnDrop(boolean value) {
+    syncStoreOnDrop = value;
+  }
+
+  /**
+   * Processes the event and persist to sentry store.
+   *
+   * @param event to be processed
+   * @return true, if the event is persisted to sentry store. false, if the 
event is not persisted.
+   * @throws Exception if there is an error processing the event.
+   */
+  boolean processNotificationEvent(NotificationEvent event) throws Exception {
+    LOGGER
+        .debug("Processing event with id:{} and Type:{}", event.getEventId(), 
event.getEventType());
+    switch (HCatEventMessage.EventType.valueOf(event.getEventType())) {
+      case CREATE_DATABASE:
+        return processCreateDatabase(event);
+      case DROP_DATABASE:
+        return processDropDatabase(event);
+      case CREATE_TABLE:
+        return processCreateTable(event);
+      case DROP_TABLE:
+        return processDropTable(event);
+      case ALTER_TABLE:
+        return processAlterTable(event);
+      case ADD_PARTITION:
+        return processAddPartition(event);
+      case DROP_PARTITION:
+        return processDropPartition(event);
+      case ALTER_PARTITION:
+        return processAlterPartition(event);
+      case INSERT:
+        return false;
+      default:
+        LOGGER.error("Notification with ID:{} has invalid event type: {}", 
event.getEventId(),
+            event.getEventType());
+        return false;
+    }
   }
 
   /**
    * Processes "create database" notification event, and applies its 
corresponding
    * snapshot change as well as delta path update into Sentry DB.
    *
-   * @param dbName database name
-   * @param location database location
-   * @param seqNum notification event ID
+   * @param event notification event to be processed.
    * @throws Exception if encounters errors while persisting the path change
    */
-  void processCreateDatabase(String dbName, String location, long seqNum) 
throws Exception {
+  private boolean processCreateDatabase(NotificationEvent event) throws 
Exception {
+    SentryJSONCreateDatabaseMessage message =
+        deserializer.getCreateDatabaseMessage(event.getMessage());
+    String dbName = message.getDB();
+    String location = message.getLocation();
+    if ((dbName == null) || (location == null)) {
+      LOGGER.error("Create database event "
+              + "has incomplete information. dbName: {} location: {}",
+          StringUtils.defaultIfBlank(dbName, "null"),
+          StringUtils.defaultIfBlank(location, "null"));
+      return false;
+    }
     List<String> locations = Collections.singletonList(location);
-    addPaths(dbName, locations, seqNum);
+    addPaths(dbName, locations, event.getEventId());
+    if (syncStoreOnCreate) {
+      dropSentryDbPrivileges(dbName, event);
+    }
+    return true;
   }
 
   /**
    * Processes "drop database" notification event, and applies its 
corresponding
    * snapshot change as well as delta path update into Sentry DB.
    *
-   * @param dbName database name
-   * @param location database location
-   * @param seqNum notification event ID
+   * @param event notification event to be processed.
    * @throws Exception if encounters errors while persisting the path change
    */
-  void processDropDatabase(String dbName, String location, long seqNum) throws 
Exception {
+  private boolean processDropDatabase(NotificationEvent event) throws 
Exception {
+    SentryJSONDropDatabaseMessage dropDatabaseMessage =
+        deserializer.getDropDatabaseMessage(event.getMessage());
+    String dbName = dropDatabaseMessage.getDB();
+    String location = dropDatabaseMessage.getLocation();
+    if (dbName == null) {
+      LOGGER.error("Drop database event has incomplete information: dbName = 
null");
+      return false;
+    }
+    if (syncStoreOnDrop) {
+      dropSentryDbPrivileges(dbName, event);
+    }
     List<String> locations = Collections.singletonList(location);
-    removePaths(dbName, locations, seqNum);
+    removePaths(dbName, locations, event.getEventId());
+    return true;
   }
 
   /**
    * Processes "create table" notification event, and applies its corresponding
    * snapshot change as well as delta path update into Sentry DB.
    *
-   * @param dbName database name
-   * @param tableName table name
-   * @param location table location
-   * @param seqNum notification event ID
+   * @param event notification event to be processed.
    * @throws Exception if encounters errors while persisting the path change
    */
-  void processCreateTable(String dbName, String tableName, String location, 
long seqNum)
-        throws Exception {
-    String authzObj = dbName + "." + tableName;
+  private boolean processCreateTable(NotificationEvent event)
+      throws Exception {
+    SentryJSONCreateTableMessage createTableMessage = deserializer
+        .getCreateTableMessage(event.getMessage());
+    String dbName = createTableMessage.getDB();
+    String tableName = createTableMessage.getTable();
+    String location = createTableMessage.getLocation();
+    if ((dbName == null) || (tableName == null) || (location == null)) {
+      LOGGER.error(String.format("Create table event " + "has incomplete 
information."
+              + " dbName = %s, tableName = %s, location = %s",
+          StringUtils.defaultIfBlank(dbName, "null"),
+          StringUtils.defaultIfBlank(tableName, "null"),
+          StringUtils.defaultIfBlank(location, "null")));
+      return false;
+    }
+    if (syncStoreOnCreate) {
+      dropSentryTablePrivileges(dbName, tableName, event);
+    }
+    String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName);
     List<String> locations = Collections.singletonList(location);
-    addPaths(authzObj, locations, seqNum);
+    addPaths(authzObj, locations, event.getEventId());
+    return true;
   }
 
   /**
@@ -98,86 +291,185 @@ class NotificationProcessor {
    * the table as well. And applies its corresponding snapshot change as well
    * as delta path update into Sentry DB.
    *
-   * @param dbName database name
-   * @param tableName table name
-   * @param seqNum notification event ID
+   * @param event notification event to be processed.
    * @throws Exception if encounters errors while persisting the path change
    */
-  void processDropTable(String dbName, String tableName, long seqNum) throws 
Exception {
-    String authzObj = dbName + "." + tableName;
-    removeAllPaths(authzObj, seqNum);
+  private boolean processDropTable(NotificationEvent event) throws Exception {
+    SentryJSONDropTableMessage dropTableMessage = deserializer
+        .getDropTableMessage(event.getMessage());
+    String dbName = dropTableMessage.getDB();
+    String tableName = dropTableMessage.getTable();
+    if ((dbName == null) || (tableName == null)) {
+      LOGGER.error("Drop table event "
+          + "has incomplete information. dbName: {}, tableName: {}",
+          StringUtils.defaultIfBlank(dbName, "null"),
+          StringUtils.defaultIfBlank(tableName, "null"));
+      return false;
+    }
+    if (syncStoreOnDrop) {
+      dropSentryTablePrivileges(dbName, tableName, event);
+    }
+    String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName);
+    removeAllPaths(authzObj, event.getEventId());
+    return true;
   }
 
   /**
    * Processes "alter table" notification event, and applies its corresponding
    * snapshot change as well as delta path update into Sentry DB.
    *
-   * @param oldDbName old database name
-   * @param newDbName new database name
-   * @param oldTableName old table name
-   * @param newTableName new table name
-   * @param oldLocation old table location
-   * @param newLocation new table location
-   * @param seqNum notification event ID
+   * @param event notification event to be processed.
    * @throws Exception if encounters errors while persisting the path change
    */
-  void processAlterTable(String oldDbName, String newDbName, String 
oldTableName,
-          String newTableName, String oldLocation, String newLocation, long 
seqNum)
-              throws Exception {
+  private boolean processAlterTable(NotificationEvent event) throws Exception {
+    SentryJSONAlterTableMessage alterTableMessage =
+        deserializer.getAlterTableMessage(event.getMessage());
+    String oldDbName = alterTableMessage.getDB();
+    String oldTableName = alterTableMessage.getTable();
+    String newDbName = event.getDbName();
+    String newTableName = event.getTableName();
+    String oldLocation = alterTableMessage.getOldLocation();
+    String newLocation = alterTableMessage.getNewLocation();
+
+    if ((oldDbName == null)
+        || (oldTableName == null)
+        || (newDbName == null)
+        || (newTableName == null)
+        || (oldLocation == null)
+        || (newLocation == null)) {
+      LOGGER.error(String.format("Alter table event "
+              + "has incomplete information. oldDbName = %s, oldTableName = 
%s, oldLocation = %s, "
+              + "newDbName = %s, newTableName = %s, newLocation = %s",
+          StringUtils.defaultIfBlank(oldDbName, "null"),
+          StringUtils.defaultIfBlank(oldTableName, "null"),
+          StringUtils.defaultIfBlank(oldLocation, "null"),
+          StringUtils.defaultIfBlank(newDbName, "null"),
+          StringUtils.defaultIfBlank(newTableName, "null"),
+          StringUtils.defaultIfBlank(newLocation, "null")));
+      return false;
+    }
+
+    if ((oldDbName.equals(newDbName))
+        && (oldTableName.equals(newTableName))
+        && (oldLocation.equals(newLocation))) {
+      LOGGER.error(String.format("Alter table notification ignored as neither 
name nor "
+              + "location has changed: oldAuthzObj = %s, oldLocation = %s, 
newAuthzObj = %s, "
+              + "newLocation = %s", oldDbName + "." + oldTableName, 
oldLocation,
+          newDbName + "." + newTableName, newLocation));
+      return false;
+    }
+
+    if (!newDbName.equalsIgnoreCase(oldDbName) || 
!oldTableName.equalsIgnoreCase(newTableName)) {
+      // Name has changed
+      try {
+        renamePrivileges(oldDbName, oldTableName, newDbName, newTableName);
+      } catch (SentryNoSuchObjectException e) {
+        LOGGER.info("Rename Sentry privilege ignored as there are no 
privileges on the table:"
+            + " {}.{}", oldDbName, oldTableName);
+      } catch (Exception e) {
+        LOGGER.info("Could not process Alter table event. Event: {}", 
event.toString(), e);
+        return false;
+      }
+    }
     String oldAuthzObj = oldDbName + "." + oldTableName;
     String newAuthzObj = newDbName + "." + newTableName;
-    renameAuthzPath(oldAuthzObj, newAuthzObj, oldLocation, newLocation, 
seqNum);
+    renameAuthzPath(oldAuthzObj, newAuthzObj, oldLocation, newLocation, 
event.getEventId());
+    return true;
   }
 
   /**
    * Processes "add partition" notification event, and applies its 
corresponding
    * snapshot change as well as delta path update into Sentry DB.
    *
-   * @param dbName database name
-   * @param tableName table name
-   * @param locations partition locations
-   * @param seqNum notification event ID
+   * @param event notification event to be processed.
    * @throws Exception if encounters errors while persisting the path change
    */
-  void processAddPartition(String dbName, String tableName,
-                           Collection<String> locations, long seqNum)
-        throws Exception {
-    String authzObj = dbName + "." + tableName;
-    addPaths(authzObj, locations, seqNum);
+  private boolean processAddPartition(NotificationEvent event)
+      throws Exception {
+    SentryJSONAddPartitionMessage addPartitionMessage =
+        deserializer.getAddPartitionMessage(event.getMessage());
+    String dbName = addPartitionMessage.getDB();
+    String tableName = addPartitionMessage.getTable();
+    List<String> locations = addPartitionMessage.getLocations();
+    if ((dbName == null) || (tableName == null) || (locations == null)) {
+      LOGGER.error(String.format("Create table event has incomplete 
information. "
+              + "dbName = %s, tableName = %s, locations = %s",
+          StringUtils.defaultIfBlank(dbName, "null"),
+          StringUtils.defaultIfBlank(tableName, "null"),
+          locations != null ? locations.toString() : "null"));
+      return false;
+    }
+    String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName);
+    addPaths(authzObj, locations, event.getEventId());
+    return true;
   }
 
   /**
    * Processes "drop partition" notification event, and applies its 
corresponding
    * snapshot change as well as delta path update into Sentry DB.
    *
-   * @param dbName database name
-   * @param tableName table name
-   * @param locations partition locations
-   * @param seqNum notification event ID
+   * @param event notification event to be processed.
    * @throws Exception if encounters errors while persisting the path change
    */
-  void processDropPartition(String dbName, String tableName,
-                            Collection<String> locations, long seqNum)
-        throws Exception {
-    String authzObj = dbName + "." + tableName;
-    removePaths(authzObj, locations, seqNum);
+  private boolean processDropPartition(NotificationEvent event)
+      throws Exception {
+    SentryJSONDropPartitionMessage dropPartitionMessage =
+        deserializer.getDropPartitionMessage(event.getMessage());
+    String dbName = dropPartitionMessage.getDB();
+    String tableName = dropPartitionMessage.getTable();
+    List<String> locations = dropPartitionMessage.getLocations();
+    if ((dbName == null) || (tableName == null) || (locations == null)) {
+      LOGGER.error(String.format("Drop partition event "
+              + "has incomplete information. dbName = %s, tableName = %s, 
location = %s",
+          StringUtils.defaultIfBlank(dbName, "null"),
+          StringUtils.defaultIfBlank(tableName, "null"),
+          locations != null ? locations.toString() : "null"));
+      return false;
+    }
+    String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName);
+    removePaths(authzObj, locations, event.getEventId());
+    return true;
   }
 
   /**
    * Processes "alter partition" notification event, and applies its 
corresponding
    * snapshot change as well as delta path update into Sentry DB.
    *
-   * @param dbName database name
-   * @param tableName table name
-   * @param oldLocation old partition location
-   * @param newLocation new partition location
-   * @param seqNum notification event ID
+   * @param event notification event to be processed.
    * @throws Exception if encounters errors while persisting the path change
    */
-  void processAlterPartition(String dbName, String tableName, String 
oldLocation,
-        String newLocation, long seqNum) throws Exception {
+  private boolean processAlterPartition(NotificationEvent event) throws 
Exception {
+    SentryJSONAlterPartitionMessage alterPartitionMessage =
+        deserializer.getAlterPartitionMessage(event.getMessage());
+    String dbName = alterPartitionMessage.getDB();
+    String tableName = alterPartitionMessage.getTable();
+    String oldLocation = alterPartitionMessage.getOldLocation();
+    String newLocation = alterPartitionMessage.getNewLocation();
+
+    if ((dbName == null)
+        || (tableName == null)
+        || (oldLocation == null)
+        || (newLocation == null)) {
+      LOGGER.error(String.format("Alter partition event "
+              + "has incomplete information. dbName = %s, tableName = %s, "
+              + "oldLocation = %s, newLocation = %s",
+          StringUtils.defaultIfBlank(dbName, "null"),
+          StringUtils.defaultIfBlank(tableName, "null"),
+          StringUtils.defaultIfBlank(oldLocation, "null"),
+          StringUtils.defaultIfBlank(newLocation, "null")));
+      return false;
+    }
+
+    if (oldLocation.equals(newLocation)) {
+      LOGGER.info(String.format("Alter partition notification ignored as"
+          + "location has not changed: AuthzObj = %s, Location = %s", dbName + 
"."
+          + "." + tableName, oldLocation));
+      return false;
+    }
+
     String oldAuthzObj = dbName + "." + tableName;
-    renameAuthzPath(oldAuthzObj, oldAuthzObj, oldLocation, newLocation, 
seqNum);
+    renameAuthzPath(oldAuthzObj, oldAuthzObj, oldLocation, newLocation, 
event.getEventId());
+    return true;
   }
 
   /**
@@ -187,10 +479,9 @@ class NotificationProcessor {
    * @param authzObj the given authzObj
    * @param locations a set of paths need to be added
    * @param seqNum notification event ID
-   * @throws Exception
    */
   private void addPaths(String authzObj, Collection<String> locations, long 
seqNum)
-        throws Exception {
+      throws Exception {
     // AuthzObj is case insensitive
     authzObj = authzObj.toLowerCase();
 
@@ -201,13 +492,13 @@ class NotificationProcessor {
     for (String location : locations) {
       String pathTree = getPath(location);
       if (pathTree == null) {
-        LOGGER.debug("#### HMS Path Update ["
+        LOGGER.debug("HMS Path Update ["
             + "OP : addPath, "
             + "authzObj : " + authzObj + ", "
             + "path : " + location + "] - nothing to add" + ", "
             + "notification event ID: " + seqNum + "]");
       } else {
-        LOGGER.debug("#### HMS Path Update ["
+        LOGGER.debug("HMS Path Update ["
             + "OP : addPath, " + "authzObj : "
             + authzObj + ", "
             + "path : " + location + ", "
@@ -226,10 +517,9 @@ class NotificationProcessor {
    * @param authzObj the given authzObj
    * @param locations a set of paths need to be removed
    * @param seqNum notification event ID
-   * @throws Exception
    */
   private void removePaths(String authzObj, Collection<String> locations, long 
seqNum)
-        throws Exception {
+      throws Exception {
     // AuthzObj is case insensitive
     authzObj = authzObj.toLowerCase();
 
@@ -238,13 +528,13 @@ class NotificationProcessor {
     for (String location : locations) {
       String pathTree = getPath(location);
       if (pathTree == null) {
-        LOGGER.debug("#### HMS Path Update ["
+        LOGGER.debug("HMS Path Update ["
             + "OP : removePath, "
             + "authzObj : " + authzObj + ", "
             + "path : " + location + "] - nothing to remove" + ", "
             + "notification event ID: " + seqNum + "]");
       } else {
-        LOGGER.debug("#### HMS Path Update ["
+        LOGGER.debug("HMS Path Update ["
             + "OP : removePath, "
             + "authzObj : " + authzObj + ", "
             + "path : " + location + ", "
@@ -263,14 +553,13 @@ class NotificationProcessor {
    *
    * @param authzObj the given authzObj to be deleted
    * @param seqNum notification event ID
-   * @throws Exception
    */
   private void removeAllPaths(String authzObj, long seqNum)
-        throws Exception {
+      throws Exception {
     // AuthzObj is case insensitive
     authzObj = authzObj.toLowerCase();
 
-    LOGGER.debug("#### HMS Path Update ["
+    LOGGER.debug("HMS Path Update ["
         + "OP : removeAllPaths, "
         + "authzObj : " + authzObj + ", "
         + "notification event ID: " + seqNum + "]");
@@ -289,21 +578,19 @@ class NotificationProcessor {
    * @param newAuthzObj the new name to be changed to
    * @param oldLocation a existing path of the given authzObj
    * @param newLocation a new path to be changed to
-   * @param seqNum
-   * @throws Exception
    */
   private void renameAuthzPath(String oldAuthzObj, String newAuthzObj, String 
oldLocation,
-          String newLocation, long seqNum) throws Exception {
+      String newLocation, long seqNum) throws Exception {
     // AuthzObj is case insensitive
     oldAuthzObj = oldAuthzObj.toLowerCase();
     newAuthzObj = newAuthzObj.toLowerCase();
     String oldPathTree = getPath(oldLocation);
     String newPathTree = getPath(newLocation);
 
-    LOGGER.debug("#### HMS Path Update ["
+    LOGGER.debug("HMS Path Update ["
         + "OP : renameAuthzObject, "
         + "oldAuthzObj : " + oldAuthzObj + ", "
-        + "newAuthzObj : " + newAuthzObj   + ", "
+        + "newAuthzObj : " + newAuthzObj + ", "
         + "oldLocation : " + oldLocation + ", "
         + "newLocation : " + newLocation + ", "
         + "notification event ID: " + seqNum + "]");
@@ -323,20 +610,10 @@ class NotificationProcessor {
           // Both name and location has changed
           // - Alter table rename for managed table
           sentryStore.renameAuthzPathsMapping(oldAuthzObj, newAuthzObj, 
oldPathTree,
-                  newPathTree, update);
+              newPathTree, update);
         }
-      } else if (oldPathTree != null) {
-        PathsUpdate update = new PathsUpdate(seqNum, false);
-        
update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree));
-        sentryStore.deleteAuthzPathsMapping(oldAuthzObj,
-            Collections.singleton(oldPathTree),
-            update);
-      } else if (newPathTree != null) {
-        PathsUpdate update = new PathsUpdate(seqNum, false);
-        
update.newPathChange(newAuthzObj).addToAddPaths(splitPath(newPathTree));
-        sentryStore.addAuthzPathsMapping(newAuthzObj,
-            Collections.singleton(newPathTree),
-            update);
+      } else {
+        updateAuthzPathsMapping(oldAuthzObj, oldPathTree, newAuthzObj, 
newPathTree, seqNum);
       }
     } else if (!oldLocation.equals(newLocation)) {
       // Only Location has changed, e.g. Alter table set location
@@ -346,27 +623,35 @@ class NotificationProcessor {
         
update.newPathChange(oldAuthzObj).addToAddPaths(splitPath(newPathTree));
         sentryStore.updateAuthzPathsMapping(oldAuthzObj, oldPathTree,
             newPathTree, update);
-      } else if (oldPathTree != null) {
-        PathsUpdate update = new PathsUpdate(seqNum, false);
-        
update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree));
-        sentryStore.deleteAuthzPathsMapping(oldAuthzObj,
-              Collections.singleton(oldPathTree),
-              update);
-      } else if (newPathTree != null) {
-        PathsUpdate update = new PathsUpdate(seqNum, false);
-        
update.newPathChange(oldAuthzObj).addToAddPaths(splitPath(newPathTree));
-        sentryStore.addAuthzPathsMapping(oldAuthzObj,
-              Collections.singleton(newPathTree),
-              update);
+      } else {
+        updateAuthzPathsMapping(oldAuthzObj, oldPathTree, newAuthzObj, 
newPathTree, seqNum);
       }
     } else {
       LOGGER.error("Update Notification for Auhorizable object {}, with no 
change, skipping",
-        oldAuthzObj);
-      throw new SentryInvalidHMSEventException("Update Notification for 
Authorizable object" +
-        "with no change");
+          oldAuthzObj);
+      throw new SentryInvalidHMSEventException("Update Notification for 
Authorizable object"
+          + "with no change");
     }
   }
 
+  private void updateAuthzPathsMapping(String oldAuthzObj, String oldPathTree,
+      String newAuthzObj, String newPathTree, long seqNum) throws Exception {
+    if (oldPathTree != null) {
+      PathsUpdate update = new PathsUpdate(seqNum, false);
+      update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree));
+      sentryStore.deleteAuthzPathsMapping(oldAuthzObj,
+          Collections.singleton(oldPathTree),
+          update);
+    } else if (newPathTree != null) {
+      PathsUpdate update = new PathsUpdate(seqNum, false);
+      update.newPathChange(newAuthzObj).addToAddPaths(splitPath(newPathTree));
+      sentryStore.addAuthzPathsMapping(newAuthzObj,
+          Collections.singleton(newPathTree),
+          update);
+    }
+
+  }
+
   /**
    * Get path tree from a given path. It return null if encounters
    * SentryMalformedPathException which indicates a malformed path.
@@ -383,15 +668,45 @@ class NotificationProcessor {
     return null;
   }
 
-  /**
-   * Split path into components on the "/" character.
-   * The path should not start with "/".
-   * This is consumed by Thrift interface, so the return result should be
-   * {@code List<String>}
-   * @param path input oath e.g. {@code foo/bar}
-   * @return list of commponents, e.g. [foo, bar]
-   */
-  private List<String> splitPath(String path) {
-    return (Lists.newArrayList(path.split("/")));
+  private void dropSentryDbPrivileges(String dbName, NotificationEvent event) {
+    try {
+      TSentryAuthorizable authorizable = new 
TSentryAuthorizable(authServerName);
+      authorizable.setDb(dbName);
+      sentryStore.dropPrivilege(authorizable, 
getPermUpdatableOnDrop(authorizable));
+    } catch (SentryNoSuchObjectException e) {
+      LOGGER.info("Drop Sentry privilege ignored as there are no privileges on 
the database: {}",
+          dbName);
+    } catch (Exception e) {
+      LOGGER.error("Could not process Drop database event." + "Event: " + 
event.toString(), e);
+    }
+  }
+
+  private void dropSentryTablePrivileges(String dbName, String tableName,
+      NotificationEvent event) {
+    try {
+      TSentryAuthorizable authorizable = new 
TSentryAuthorizable(authServerName);
+      authorizable.setDb(dbName);
+      authorizable.setTable(tableName);
+      sentryStore.dropPrivilege(authorizable, 
getPermUpdatableOnDrop(authorizable));
+    } catch (SentryNoSuchObjectException e) {
+      LOGGER.info("Drop Sentry privilege ignored as there are no privileges on 
the table: {}.{}",
+          dbName, tableName);
+    } catch (Exception e) {
+      LOGGER.error("Could not process Drop table event. Event: " + 
event.toString(), e);
+    }
+  }
+
+  private void renamePrivileges(String oldDbName, String oldTableName, String 
newDbName,
+      String newTableName) throws
+      Exception {
+    TSentryAuthorizable oldAuthorizable = new 
TSentryAuthorizable(authServerName);
+    oldAuthorizable.setDb(oldDbName);
+    oldAuthorizable.setTable(oldTableName);
+    TSentryAuthorizable newAuthorizable = new 
TSentryAuthorizable(authServerName);
+    newAuthorizable.setDb(newDbName);
+    newAuthorizable.setTable(newTableName);
+    Update update =
+        getPermUpdatableOnRename(oldAuthorizable, newAuthorizable);
+    sentryStore.renamePrivilege(oldAuthorizable, newAuthorizable, update);
   }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/4cdb506c/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
new file mode 100644
index 0000000..05518e8
--- /dev/null
+++ 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
@@ -0,0 +1,244 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  <p>
+  http://www.apache.org/licenses/LICENSE-2.0
+  <p>
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import static com.codahale.metrics.MetricRegistry.name;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Timer;
+import com.codahale.metrics.Timer.Context;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.sentry.provider.db.service.persistent.PathsImage;
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
+import org.apache.sentry.provider.db.service.thrift.SentryMetrics;
+
+import org.apache.thrift.TException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Wrapper class for <Code>HiveMetaStoreClient</Code>
+ *
+ * <p>Abstracts communication with HMS and exposes APi's to connect/disconnect 
to HMS and to
+ * request HMS snapshots and also for new notifications.
+ */
+final class SentryHMSClient implements AutoCloseable {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SentryHMSClient.class);
+  private final Configuration conf;
+  private HiveMetaStoreClient client = null;
+  private HiveConnectionFactory hiveConnectionFactory;
+
+  private static final String SNAPSHOT = "snapshot";
+  /** Measures time to get full snapshot. */
+  private final Timer updateTimer = SentryMetrics.getInstance()
+      .getTimer(name(FullUpdateInitializer.class, SNAPSHOT));
+  /** Number of times update failed. */
+  private final Counter failedSnapshotsCount = SentryMetrics.getInstance()
+      .getCounter(name(FullUpdateInitializer.class, "failed"));
+
+  SentryHMSClient(Configuration conf, HiveConnectionFactory 
hiveConnectionFactory) {
+    this.conf = conf;
+    this.hiveConnectionFactory = hiveConnectionFactory;
+  }
+
+  /**
+   * Used only for testing purposes.
+   *x
+   * @param client HiveMetaStoreClient to be initialized
+   */
+  @VisibleForTesting
+  void setClient(HiveMetaStoreClient client) {
+    this.client = client;
+  }
+
+  /**
+   * Used to know if the client is connected to HMS
+   *
+   * @return true if the client is connected to HMS false, if client is not 
connected.
+   */
+  boolean isConnected() {
+    return client != null;
+  }
+
+  /**
+   * Connects to HMS by creating HiveMetaStoreClient.
+   *
+   * @throws IOException          if could not establish connection
+   * @throws InterruptedException if connection was interrupted
+   * @throws MetaException        if other errors happened
+   */
+  void connect()
+      throws IOException, InterruptedException, MetaException {
+    if (client != null) {
+      return;
+    }
+    client = hiveConnectionFactory.connect().getClient();
+  }
+
+  /**
+   * Disconnects the HMS client.
+   */
+  public void disconnect() throws Exception {
+    try {
+      if (client != null) {
+        LOGGER.info("Closing the HMS client connection");
+        client.close();
+      }
+    } catch (Exception e) {
+      LOGGER.error("failed to close Hive Connection Factory", e);
+    } finally {
+      client = null;
+    }
+  }
+
+  /**
+   * Closes the HMS client.
+   *
+   * <p>This is similar to disconnect. As this class implements AutoClosable, 
close should be
+   * implemented.
+   */
+  public void close() throws Exception {
+    disconnect();
+  }
+
+  /**
+   * Creates HMS full snapshot.
+   *
+   * @return Full path snapshot and the last notification id on success
+   */
+  PathsImage getFullSnapshot() {
+    try {
+      if (client == null) {
+        LOGGER.error("Client is not connected to HMS");
+        return new PathsImage(Collections.<String, Set<String>>emptyMap(),
+            SentryStore.EMPTY_NOTIFICATION_ID, 
SentryStore.EMPTY_PATHS_SNAPSHOT_ID);
+      }
+
+      CurrentNotificationEventId eventIdBefore = 
client.getCurrentNotificationEventId();
+      Map<String, Set<String>> pathsFullSnapshot = fetchFullUpdate();
+      if (pathsFullSnapshot.isEmpty()) {
+        return new PathsImage(pathsFullSnapshot, 
SentryStore.EMPTY_NOTIFICATION_ID,
+            SentryStore.EMPTY_PATHS_SNAPSHOT_ID);
+      }
+
+      CurrentNotificationEventId eventIdAfter = 
client.getCurrentNotificationEventId();
+      LOGGER.info("NotificationID, Before Snapshot: {}, After Snapshot {}",
+          eventIdBefore.getEventId(), eventIdAfter.getEventId());
+      // To ensure point-in-time snapshot consistency, need to make sure
+      // there were no HMS updates while retrieving the snapshot. If there are 
updates, snapshot
+      // is discarded. New attempt will be made after 500 milliseconds when
+      // HMSFollower runs again.
+      if (!eventIdBefore.equals(eventIdAfter)) {
+        LOGGER.error("Snapshot discarded, updates to HMS data while shapshot 
is being taken."
+            + "ID Before: {}. ID After: {}", eventIdBefore.getEventId(), 
eventIdAfter.getEventId());
+        return new PathsImage(Collections.<String, Set<String>>emptyMap(),
+            SentryStore.EMPTY_NOTIFICATION_ID, 
SentryStore.EMPTY_PATHS_SNAPSHOT_ID);
+      }
+
+      LOGGER.info("Successfully fetched hive full snapshot, Current 
NotificationID: {}.",
+          eventIdAfter);
+      // As eventIDAfter is the last event that was processed, eventIDAfter is 
used to update
+      // lastProcessedNotificationID instead of getting it from persistent 
store.
+      return new PathsImage(pathsFullSnapshot, eventIdAfter.getEventId(),
+          SentryStore.EMPTY_PATHS_SNAPSHOT_ID);
+    } catch (TException failure) {
+      LOGGER.error("Failed to communicate to HMS");
+      return new PathsImage(Collections.<String, Set<String>>emptyMap(),
+          SentryStore.EMPTY_NOTIFICATION_ID, 
SentryStore.EMPTY_PATHS_SNAPSHOT_ID);
+    }
+  }
+
+  /**
+   * Retrieve a Hive full snapshot from HMS.
+   *
+   * @return HMS snapshot. Snapshot consists of a mapping from auth object 
name to the set of paths
+   *     corresponding to that name.
+   */
+  private Map<String, Set<String>> fetchFullUpdate() {
+    LOGGER.info("Request full HMS snapshot");
+    try (FullUpdateInitializer updateInitializer =
+             new FullUpdateInitializer(hiveConnectionFactory, conf);
+         Context context = updateTimer.time()) {
+      Map<String, Set<String>> pathsUpdate = 
updateInitializer.getFullHMSSnapshot();
+      LOGGER.info("Obtained full HMS snapshot");
+      return pathsUpdate;
+    } catch (Exception ignored) {
+      failedSnapshotsCount.inc();
+      LOGGER.error("Snapshot created failed ", ignored);
+      return Collections.emptyMap();
+    }
+  }
+
+  /**
+   * Returns all HMS notifications with ID greater than the specified one
+   *
+   * @param notificationId ID of the last notification that was processed.
+   * @return Collection of new events to be synced
+   */
+  Collection<NotificationEvent> getNotifications(long notificationId) throws 
Exception {
+    if (client == null) {
+      LOGGER.error("Client is not connected to HMS");
+      return Collections.emptyList();
+    }
+    LOGGER.debug("Checking for notifications beyond {}", notificationId);
+    // HIVE-15761: Currently getNextNotification API may return an empty
+    // NotificationEventResponse causing TProtocolException.
+    // Workaround: Only processes the notification events newer than the last 
updated one.
+    CurrentNotificationEventId eventId = 
client.getCurrentNotificationEventId();
+    LOGGER.debug("ID of Last HMS notifications is: {}", eventId.getEventId());
+    if (eventId.getEventId() < notificationId) {
+      LOGGER.error("Last notification of HMS is smaller than what sentry 
processed, Something is"
+          + "wrong. Sentry will request a full Snapshot");
+      // TODO Path Mapping info should be cleared so that HMSFollower would 
request for full
+      // snapshot in the subsequent run.
+      return Collections.emptyList();
+    }
+
+    if (eventId.getEventId() == notificationId) {
+      return Collections.emptyList();
+    }
+
+    NotificationEventResponse response =
+        client.getNextNotification(notificationId, Integer.MAX_VALUE, null);
+    if (response.isSetEvents()) {
+      LOGGER.debug("Last Id processed:{}. Received collection of 
notifications, Size:{}",
+          notificationId, response.getEvents().size());
+      return response.getEvents();
+    }
+
+    return Collections.emptyList();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/4cdb506c/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
index 6014a79..10d55dc 100644
--- 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
+++ 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
@@ -296,7 +296,6 @@ public class SentryService implements Callable, 
SigUtils.SigListener {
     hiveConnectionFactory = new HiveSimpleConnectionFactory(conf, new 
HiveConf());
     hiveConnectionFactory.init();
     hmsFollower = new HMSFollower(conf, sentryStore, leaderMonitor, 
hiveConnectionFactory);
-
     long initDelay = 
conf.getLong(ServerConfig.SENTRY_HMSFOLLOWER_INIT_DELAY_MILLS,
             ServerConfig.SENTRY_HMSFOLLOWER_INIT_DELAY_MILLS_DEFAULT);
     long period = conf.getLong(ServerConfig.SENTRY_HMSFOLLOWER_INTERVAL_MILLS,
@@ -350,7 +349,7 @@ public class SentryService implements Callable, 
SigUtils.SigListener {
       try {
         // close connections
         hmsFollower.close();
-      } catch (RuntimeException ex) {
+      } catch (Exception ex) {
         LOGGER.error("HMSFollower.close() failed", ex);
       } finally {
         hmsFollower = null;

http://git-wip-us.apache.org/repos/asf/sentry/blob/4cdb506c/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceUtil.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceUtil.java
 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceUtil.java
index 9c3e485..5826766 100644
--- 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceUtil.java
+++ 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceUtil.java
@@ -30,9 +30,12 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.sentry.core.common.exception.SentryInvalidInputException;
 import org.apache.sentry.core.common.utils.SentryConstants;
 import org.apache.sentry.core.common.utils.KeyValue;
 import org.apache.sentry.core.common.utils.PolicyFileConstants;
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
+import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable;
 import org.apache.sentry.provider.db.service.thrift.TSentryGrantOption;
 import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege;
 import org.apache.sentry.service.thrift.ServiceConstants.PrivilegeScope;
@@ -219,8 +222,38 @@ public final class SentryServiceUtil {
     return hiveConf.get(METASTOREURIS.varname);
   }
 
+  /**
+   * Derives object name from database and table names by concatenating them
+   *
+   * @param authorizable for which is name is to be derived
+   * @return authorizable name
+   * @throws SentryInvalidInputException if argument provided does not have 
all the
+   *                                     required fields set.
+   */
+  public static String getAuthzObj(TSentryAuthorizable authorizable)
+    throws SentryInvalidInputException {
+    return getAuthzObj(authorizable.getDb(), authorizable.getTable());
+  }
+
+  /**
+   * Derives object name from database and table names by concatenating them
+   *
+   * @param dbName
+   * @param tblName
+   * @return authorizable name
+   * @throws SentryInvalidInputException if argument provided does not have 
all the
+   *                                     required fields set.
+   */
+  public static String getAuthzObj(String dbName, String tblName)
+    throws SentryInvalidInputException {
+    if (SentryStore.isNULL(dbName)) {
+      throw new SentryInvalidInputException("Invalif input, DB name is 
missing");
+    }
+    return SentryStore.isNULL(tblName) ? dbName.toLowerCase() :
+      (dbName + "." + tblName).toLowerCase();
+  }
+
   private SentryServiceUtil() {
     // Make constructor private to avoid instantiation
   }
-
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/4cdb506c/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
 
b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
index 51f6c5d..a8ebf7c 100644
--- 
a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
+++ 
b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
@@ -2697,7 +2697,6 @@ public class TestSentryStore extends org.junit.Assert {
   @Test
   public void testRenameUpdateAfterReplacingANewPathsImage() throws Exception {
     Map<String, Set<String>> authzPaths = new HashMap<>();
-
     // First image to persist (this will be replaced later)
     authzPaths.put("db1.table1", 
Sets.newHashSet("/user/hive/warehouse/db2.db/table1.1",
         "/user/hive/warehouse/db2.db/table1.2"));

http://git-wip-us.apache.org/repos/asf/sentry/blob/4cdb506c/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
 
b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
index 66ad2a1..d67c162 100644
--- 
a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
+++ 
b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
@@ -16,75 +16,115 @@
  */
 package org.apache.sentry.service.thrift;
 
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hive.hcatalog.messaging.HCatEventMessage;
 import org.apache.hive.hcatalog.messaging.HCatEventMessage.EventType;
 import 
org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageFactory;
 import org.apache.sentry.hdfs.Updateable;
 import org.apache.sentry.provider.db.service.persistent.SentryStore;
 import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable;
-
-import java.util.Arrays;
-
-import org.junit.Test;
+import org.junit.BeforeClass;
 import org.junit.Ignore;
+import org.junit.Test;
 import org.mockito.Mockito;
 
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.reset;
+import javax.security.auth.login.LoginException;
 
 public class TestHMSFollower {
-  SentryJSONMessageFactory messageFactory = new SentryJSONMessageFactory();
-  SentryStore sentryStore = Mockito.mock(SentryStore.class);
-  final static String hiveInstance = "server2";
 
+  private final static String hiveInstance = "server2";
+  private final static Configuration configuration = new Configuration();
+  private final SentryJSONMessageFactory messageFactory = new 
SentryJSONMessageFactory();
+  private final SentryStore sentryStore = Mockito.mock(SentryStore.class);
+  private static HiveSimpleConnectionFactory hiveConnectionFactory;
+
+  @BeforeClass
+  public static void setup() throws IOException, LoginException {
+    hiveConnectionFactory = new HiveSimpleConnectionFactory(configuration, new 
HiveConf());
+    hiveConnectionFactory.init();
+    configuration.set("sentry.hive.sync.create", "true");
+  }
+
+  /**
+   * Constructs create database event and makes sure that appropriate sentry 
store API's
+   * are invoke when the event is processed by hms follower.
+   *
+   * @throws Exception
+   */
   @Test
   public void testCreateDatabase() throws Exception {
     String dbName = "db1";
 
     // Create notification events
-    NotificationEvent notificationEvent = new NotificationEvent(1, 0, 
HCatEventMessage.EventType.CREATE_DATABASE.toString(),
-      messageFactory.buildCreateDatabaseMessage(new Database(dbName, null, 
"hdfs:///db1", null)).toString());
+    NotificationEvent notificationEvent = new NotificationEvent(1, 0,
+        HCatEventMessage.EventType.CREATE_DATABASE.toString(),
+        messageFactory.buildCreateDatabaseMessage(new Database(dbName, null, 
"hdfs:///db1", null))
+            .toString());
     List<NotificationEvent> events = new ArrayList<>();
     events.add(notificationEvent);
-
-    Configuration configuration = new Configuration();
-    HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, 
hiveInstance);
-    hmsFollower.processNotificationEvents(events);
+    HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
+        hiveConnectionFactory, hiveInstance);
+    hmsFollower.processNotifications(events);
 
     TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
     authorizable.setServer(hiveInstance);
     authorizable.setDb("db1");
 
-    verify(sentryStore, times(0)).dropPrivilege(authorizable, 
HMSFollower.onDropSentryPrivilege(authorizable));
+    verify(sentryStore, times(1))
+        .dropPrivilege(authorizable, 
NotificationProcessor.getPermUpdatableOnDrop(authorizable));
   }
 
+  /**
+   * Constructs drop database event and makes sure that appropriate sentry 
store API's
+   * are invoke when the event is processed by hms follower.
+   *
+   * @throws Exception
+   */
   @Test
   public void testDropDatabase() throws Exception {
     String dbName = "db1";
 
     // Create notification events
-    NotificationEvent notificationEvent = new NotificationEvent(1, 0, 
HCatEventMessage.EventType.DROP_DATABASE.toString(),
-      messageFactory.buildDropDatabaseMessage(new Database(dbName, null, 
"hdfs:///db1", null)).toString());
+    NotificationEvent notificationEvent = new NotificationEvent(1, 0,
+        HCatEventMessage.EventType.DROP_DATABASE.toString(),
+        messageFactory.buildDropDatabaseMessage(new Database(dbName, null, 
"hdfs:///db1", null))
+            .toString());
     List<NotificationEvent> events = new ArrayList<>();
     events.add(notificationEvent);
 
-    Configuration configuration = new Configuration();
-    HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, 
hiveInstance);
-    hmsFollower.processNotificationEvents(events);
+    HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
+        hiveConnectionFactory, hiveInstance);
+    hmsFollower.processNotifications(events);
 
     TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
     authorizable.setServer(hiveInstance);
     authorizable.setDb("db1");
 
-    verify(sentryStore, times(1)).dropPrivilege(authorizable, 
HMSFollower.onDropSentryPrivilege(authorizable));
+    verify(sentryStore, times(1))
+        .dropPrivilege(authorizable, 
NotificationProcessor.getPermUpdatableOnDrop(authorizable));
   }
 
+  /**
+   * Constructs create table event and makes sure that appropriate sentry 
store API's
+   * are invoke when the event is processed by hms follower.
+   *
+   * @throws Exception
+   */
   @Test
   public void testCreateTable() throws Exception {
     String dbName = "db1";
@@ -93,23 +133,33 @@ public class TestHMSFollower {
     // Create notification events
     StorageDescriptor sd = new StorageDescriptor();
     sd.setLocation("hdfs:///db1.db/table1");
-    NotificationEvent notificationEvent = new NotificationEvent(1, 0, 
HCatEventMessage.EventType.CREATE_TABLE.toString(),
-      messageFactory.buildCreateTableMessage(new Table(tableName, dbName, 
null, 0, 0, 0, sd, null, null, null, null, null)).toString());
+    NotificationEvent notificationEvent = new NotificationEvent(1, 0,
+        HCatEventMessage.EventType.CREATE_TABLE.toString(),
+        messageFactory.buildCreateTableMessage(
+            new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, 
null, null))
+            .toString());
     List<NotificationEvent> events = new ArrayList<>();
     events.add(notificationEvent);
 
-    Configuration configuration = new Configuration();
-    HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, 
hiveInstance);
-    hmsFollower.processNotificationEvents(events);
+    HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
+        hiveConnectionFactory, hiveInstance);
+    hmsFollower.processNotifications(events);
 
     TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
     authorizable.setServer(hiveInstance);
     authorizable.setDb("db1");
     authorizable.setTable(tableName);
 
-    verify(sentryStore, times(0)).dropPrivilege(authorizable, 
HMSFollower.onDropSentryPrivilege(authorizable));
+    verify(sentryStore, times(1))
+        .dropPrivilege(authorizable, 
NotificationProcessor.getPermUpdatableOnDrop(authorizable));
   }
 
+  /**
+   * Constructs drop table event and makes sure that appropriate sentry store 
API's
+   * are invoke when the event is processed by hms follower.
+   *
+   * @throws Exception
+   */
   @Test
   public void testDropTable() throws Exception {
     String dbName = "db1";
@@ -118,23 +168,33 @@ public class TestHMSFollower {
     // Create notification events
     StorageDescriptor sd = new StorageDescriptor();
     sd.setLocation("hdfs:///db1.db/table1");
-    NotificationEvent notificationEvent = new NotificationEvent(1, 0, 
HCatEventMessage.EventType.DROP_TABLE.toString(),
-      messageFactory.buildDropTableMessage(new Table(tableName, dbName, null, 
0, 0, 0, sd, null, null, null, null, null)).toString());
+    NotificationEvent notificationEvent = new NotificationEvent(1, 0,
+        HCatEventMessage.EventType.DROP_TABLE.toString(),
+        messageFactory.buildDropTableMessage(
+            new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, 
null, null))
+            .toString());
     List<NotificationEvent> events = new ArrayList<>();
     events.add(notificationEvent);
 
-    Configuration configuration = new Configuration();
-    HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, 
hiveInstance);
-    hmsFollower.processNotificationEvents(events);
+    HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
+        hiveConnectionFactory, hiveInstance);
+    hmsFollower.processNotifications(events);
 
     TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
     authorizable.setServer(hiveInstance);
     authorizable.setDb("db1");
     authorizable.setTable(tableName);
 
-    verify(sentryStore, times(1)).dropPrivilege(authorizable, 
HMSFollower.onDropSentryPrivilege(authorizable));
+    verify(sentryStore, times(1))
+        .dropPrivilege(authorizable, 
NotificationProcessor.getPermUpdatableOnDrop(authorizable));
   }
 
+  /**
+   * Constructs rename table event and makes sure that appropriate sentry 
store API's
+   * are invoke when the event is processed by hms follower.
+   *
+   * @throws Exception
+   */
   @Test
   public void testRenameTable() throws Exception {
     String dbName = "db1";
@@ -146,18 +206,20 @@ public class TestHMSFollower {
     // Create notification events
     StorageDescriptor sd = new StorageDescriptor();
     sd.setLocation("hdfs:///db1.db/table1");
-    NotificationEvent notificationEvent = new NotificationEvent(1, 0, 
HCatEventMessage.EventType.ALTER_TABLE.toString(),
-      messageFactory.buildAlterTableMessage(
-        new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, 
null, null),
-        new Table(newTableName, newDbName, null, 0, 0, 0, sd, null, null, 
null, null, null)).toString());
+    NotificationEvent notificationEvent = new NotificationEvent(1, 0,
+        HCatEventMessage.EventType.ALTER_TABLE.toString(),
+        messageFactory.buildAlterTableMessage(
+            new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, 
null, null),
+            new Table(newTableName, newDbName, null, 0, 0, 0, sd, null, null, 
null, null, null))
+            .toString());
     notificationEvent.setDbName(newDbName);
     notificationEvent.setTableName(newTableName);
     List<NotificationEvent> events = new ArrayList<>();
     events.add(notificationEvent);
 
-    Configuration configuration = new Configuration();
-    HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, 
hiveInstance);
-    hmsFollower.processNotificationEvents(events);
+    HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
+        hiveConnectionFactory, hiveInstance);
+    hmsFollower.processNotifications(events);
 
     TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
     authorizable.setServer(hiveInstance);
@@ -169,11 +231,20 @@ public class TestHMSFollower {
     newAuthorizable.setDb(newDbName);
     newAuthorizable.setTable(newTableName);
 
-    verify(sentryStore, times(1)).renamePrivilege(authorizable, 
newAuthorizable, HMSFollower.onRenameSentryPrivilege(authorizable, 
newAuthorizable));
+    verify(sentryStore, times(1)).renamePrivilege(authorizable, 
newAuthorizable,
+        NotificationProcessor.getPermUpdatableOnRename(authorizable, 
newAuthorizable));
   }
 
 
   @Ignore
+  /**
+   * Constructs a bunch of events and passed to processor of hms follower. One 
of those is alter
+   * partition event with out actually changing anything(invalid event). Idea 
is to make sure that
+   * hms follower calls appropriate sentry store API's for the events 
processed by hms follower
+   * after processing the invalid alter partition event.
+   *
+   * @throws Exception
+   */
   @Test
   public void testAlterPartitionWithInvalidEvent() throws Exception {
     String dbName = "db1";
@@ -181,35 +252,38 @@ public class TestHMSFollower {
     String tableName2 = "table2";
     long inputEventId = 1;
     List<NotificationEvent> events = new ArrayList<>();
-    NotificationEvent notificationEvent = null;
+    NotificationEvent notificationEvent;
     List<FieldSchema> partCols;
-    StorageDescriptor sd = null;
+    StorageDescriptor sd;
     
Mockito.doNothing().when(sentryStore).persistLastProcessedNotificationID(Mockito.anyLong());
+    //noinspection unchecked
     
Mockito.doNothing().when(sentryStore).addAuthzPathsMapping(Mockito.anyString(),
-      Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
+        Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
 
     Configuration configuration = new Configuration();
-    HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, 
hiveInstance);
-
+    HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
+        hiveConnectionFactory, hiveInstance);
     // Create a table
     sd = new StorageDescriptor();
     sd.setLocation("hdfs://db1.db/table1");
-    partCols = new ArrayList<FieldSchema>();
+    partCols = new ArrayList<>();
     partCols.add(new FieldSchema("ds", "string", ""));
-    Table table = new Table(tableName1, dbName, null, 0, 0, 0, sd, partCols, 
null, null, null, null);
+    Table table = new Table(tableName1, dbName, null, 0, 0, 0, sd, partCols, 
null, null, null,
+        null);
     notificationEvent = new NotificationEvent(inputEventId, 0,
-      HCatEventMessage.EventType.CREATE_TABLE.toString(),
-      messageFactory.buildCreateTableMessage(table).toString());
+        HCatEventMessage.EventType.CREATE_TABLE.toString(),
+        messageFactory.buildCreateTableMessage(table).toString());
     notificationEvent.setDbName(dbName);
     notificationEvent.setTableName(tableName1);
     events.add(notificationEvent);
     inputEventId += 1;
     // Process the notification
-    hmsFollower.processNotificationEvents(events);
+    hmsFollower.processNotifications(events);
     // Make sure that addAuthzPathsMapping was invoked once to handle 
CREATE_TABLE notification
     // and persistLastProcessedNotificationID was not invoked.
+    //noinspection unchecked
     verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(),
-      Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
+        Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
     verify(sentryStore, 
times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
     reset(sentryStore);
     events.clear();
@@ -218,21 +292,22 @@ public class TestHMSFollower {
     List<Partition> partitions = new ArrayList<>();
     StorageDescriptor invalidSd = new StorageDescriptor();
     invalidSd.setLocation(null);
-    Partition partition = new Partition(Arrays.asList("today"), dbName, 
tableName1,
-      0, 0, sd, null);
+    Partition partition = new Partition(Collections.singletonList("today"), 
dbName, tableName1,
+        0, 0, sd, null);
     partitions.add(partition);
     notificationEvent = new NotificationEvent(inputEventId, 0, 
EventType.ADD_PARTITION.toString(),
-      messageFactory.buildAddPartitionMessage(table, partitions).toString());
+        messageFactory.buildAddPartitionMessage(table, partitions).toString());
     notificationEvent.setDbName(dbName);
     notificationEvent.setTableName(tableName1);
     events.add(notificationEvent);
     inputEventId += 1;
     //Process the notification
-    hmsFollower.processNotificationEvents(events);
+    hmsFollower.processNotifications(events);
     // Make sure that addAuthzPathsMapping was invoked once to handle 
ADD_PARTITION notification
     // and persistLastProcessedNotificationID was not invoked.
+    //noinspection unchecked
     verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(),
-      Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
+        Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
     verify(sentryStore, 
times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
     reset(sentryStore);
     events.clear();
@@ -241,13 +316,13 @@ public class TestHMSFollower {
     // This is an invalid event and should be processed by sentry store.
     // Event Id should be explicitly persisted using 
persistLastProcessedNotificationID
     notificationEvent = new NotificationEvent(inputEventId, 0, 
EventType.ALTER_PARTITION.toString(),
-      messageFactory.buildAlterPartitionMessage(partition, 
partition).toString());
+        messageFactory.buildAlterPartitionMessage(partition, 
partition).toString());
     notificationEvent.setDbName(dbName);
     notificationEvent.setTableName(tableName1);
     events.add(notificationEvent);
     inputEventId += 1;
     // Process the notification
-    hmsFollower.processNotificationEvents(events);
+    hmsFollower.processNotifications(events);
     // Make sure that persistLastProcessedNotificationID is invoked explicitly.
     verify(sentryStore, 
times(1)).persistLastProcessedNotificationID(inputEventId - 1);
     reset(sentryStore);
@@ -255,21 +330,21 @@ public class TestHMSFollower {
 
     // Create a alter notification with some actual change.
     sd = new StorageDescriptor();
-    sd.setLocation("hdfs://user/hive/wareshouse/db1.db/table1");
+    sd.setLocation("hdfs://user/hive/warehouse/db1.db/table1");
     Partition updatedPartition = new Partition(partition);
     updatedPartition.setSd(sd);
     notificationEvent = new NotificationEvent(inputEventId, 0, 
EventType.ALTER_PARTITION.toString(),
-      messageFactory.buildAlterPartitionMessage(partition, 
updatedPartition).toString());
+        messageFactory.buildAlterPartitionMessage(partition, 
updatedPartition).toString());
     notificationEvent.setDbName(dbName);
     notificationEvent.setTableName(tableName1);
     events.add(notificationEvent);
     inputEventId += 1;
     // Process the notification
-    hmsFollower.processNotificationEvents(events);
+    hmsFollower.processNotifications(events);
     // Make sure that updateAuthzPathsMapping was invoked once to handle 
ALTER_PARTITION
     // notification and persistLastProcessedNotificationID was not invoked.
     verify(sentryStore, times(1)).updateAuthzPathsMapping(Mockito.anyString(),
-      Mockito.anyString(), Mockito.anyString(), 
Mockito.any(Updateable.Update.class));
+        Mockito.anyString(), Mockito.anyString(), 
Mockito.any(Updateable.Update.class));
     verify(sentryStore, 
times(0)).persistLastProcessedNotificationID(inputEventId - 1);
     reset(sentryStore);
     events.clear();
@@ -277,24 +352,34 @@ public class TestHMSFollower {
     // Create a table
     sd = new StorageDescriptor();
     sd.setLocation("hdfs://db1.db/table2");
-    partCols = new ArrayList<FieldSchema>();
+    partCols = new ArrayList<>();
     partCols.add(new FieldSchema("ds", "string", ""));
-    Table table1 = new Table(tableName2, dbName, null, 0, 0, 0, sd, partCols, 
null, null, null, null);
+    Table table1 = new Table(tableName2, dbName, null, 0, 0, 0, sd, partCols, 
null, null, null,
+        null);
     notificationEvent = new NotificationEvent(inputEventId, 0,
-      HCatEventMessage.EventType.CREATE_TABLE.toString(),
-      messageFactory.buildCreateTableMessage(table1).toString());
+        HCatEventMessage.EventType.CREATE_TABLE.toString(),
+        messageFactory.buildCreateTableMessage(table1).toString());
     notificationEvent.setDbName(dbName);
     notificationEvent.setTableName(tableName2);
     events.add(notificationEvent);
     // Process the notification
-    hmsFollower.processNotificationEvents(events);
+    hmsFollower.processNotifications(events);
     // Make sure that addAuthzPathsMapping was invoked once to handle 
CREATE_TABLE notification
     // and persistLastProcessedNotificationID was not invoked.
+    //noinspection unchecked
     verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(),
-      Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
+        Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
     verify(sentryStore, 
times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
   }
 
+  /**
+   * Constructs a bunch of events and passed to processor of hms follower. One 
of those is alter
+   * table event with out actually changing anything(invalid event). Idea is 
to make sure that
+   * hms follower calls appropriate sentry store API's for the events 
processed by hms follower
+   * after processing the invalid alter table event.
+   *
+   * @throws Exception
+   */
   @Test
   public void testAlterTableWithInvalidEvent() throws Exception {
     String dbName = "db1";
@@ -302,61 +387,67 @@ public class TestHMSFollower {
     String tableName2 = "table2";
     long inputEventId = 1;
     List<NotificationEvent> events = new ArrayList<>();
-    NotificationEvent notificationEvent = null;
+    NotificationEvent notificationEvent;
     List<FieldSchema> partCols;
-    StorageDescriptor sd = null;
+    StorageDescriptor sd;
     
Mockito.doNothing().when(sentryStore).persistLastProcessedNotificationID(Mockito.anyLong());
+    //noinspection unchecked
     
Mockito.doNothing().when(sentryStore).addAuthzPathsMapping(Mockito.anyString(),
-      Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
+        Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
 
     Configuration configuration = new Configuration();
-    HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, 
hiveInstance);
+    HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
+        hiveConnectionFactory, hiveInstance);
 
     // Create a table
     sd = new StorageDescriptor();
     sd.setLocation("hdfs://db1.db/table1");
-    partCols = new ArrayList<FieldSchema>();
+    partCols = new ArrayList<>();
     partCols.add(new FieldSchema("ds", "string", ""));
-    Table table = new Table(tableName1, dbName, null, 0, 0, 0, sd, partCols, 
null, null, null, null);
+    Table table = new Table(tableName1, dbName, null, 0, 0, 0, sd, partCols, 
null, null, null,
+        null);
     notificationEvent = new NotificationEvent(inputEventId, 0,
-      HCatEventMessage.EventType.CREATE_TABLE.toString(),
-      messageFactory.buildCreateTableMessage(table).toString());
+        HCatEventMessage.EventType.CREATE_TABLE.toString(),
+        messageFactory.buildCreateTableMessage(table).toString());
     notificationEvent.setDbName(dbName);
     notificationEvent.setTableName(tableName1);
     events.add(notificationEvent);
     inputEventId += 1;
     // Process the notification
-    hmsFollower.processNotificationEvents(events);
+    hmsFollower.processNotifications(events);
     // Make sure that addAuthzPathsMapping was invoked once to handle 
CREATE_TABLE notification
     // and persistLastProcessedNotificationID was not invoked.
+    //noinspection unchecked
     verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(),
-      Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
+        Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
     verify(sentryStore, 
times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
     reset(sentryStore);
     events.clear();
 
-
-    // Create alter table notification with actuall changing anything.
+    // Create alter table notification with out actually changing anything.
     // This notification should not be processed by sentry server
     // Notification should be persisted explicitly
-    notificationEvent = new NotificationEvent(1, 0, 
HCatEventMessage.EventType.ALTER_TABLE.toString(),
-      messageFactory.buildAlterTableMessage(
-        new Table(tableName1, dbName, null, 0, 0, 0, sd, null, null, null, 
null, null),
-        new Table(tableName1, dbName, null, 0, 0, 0, sd, null, null, null, 
null, null)).toString());
+    notificationEvent = new NotificationEvent(1, 0,
+        HCatEventMessage.EventType.ALTER_TABLE.toString(),
+        messageFactory.buildAlterTableMessage(
+            new Table(tableName1, dbName, null, 0, 0, 0, sd, null, null, null, 
null, null),
+            new Table(tableName1, dbName, null, 0, 0, 0, sd, null, null, null, 
null, null))
+            .toString());
     notificationEvent.setDbName(dbName);
     notificationEvent.setTableName(tableName1);
     events = new ArrayList<>();
     events.add(notificationEvent);
     inputEventId += 1;
     // Process the notification
-    hmsFollower.processNotificationEvents(events);
+    hmsFollower.processNotifications(events);
     // Make sure that renameAuthzObj and deleteAuthzPathsMapping were  not 
invoked
     // to handle CREATE_TABLE notification
     // and persistLastProcessedNotificationID is explicitly invoked
     verify(sentryStore, times(0)).renameAuthzObj(Mockito.anyString(), 
Mockito.anyString(),
-      Mockito.any(Updateable.Update.class));
+        Mockito.any(Updateable.Update.class));
+    //noinspection unchecked
     verify(sentryStore, times(0)).deleteAuthzPathsMapping(Mockito.anyString(),
-      Mockito.anyCollection(),  Mockito.any(Updateable.Update.class));
+        Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
     verify(sentryStore, 
times(1)).persistLastProcessedNotificationID(Mockito.anyLong());
     reset(sentryStore);
     events.clear();
@@ -364,21 +455,78 @@ public class TestHMSFollower {
     // Create a table
     sd = new StorageDescriptor();
     sd.setLocation("hdfs://db1.db/table2");
-    partCols = new ArrayList<FieldSchema>();
+    partCols = new ArrayList<>();
     partCols.add(new FieldSchema("ds", "string", ""));
-    Table table1 = new Table(tableName2, dbName, null, 0, 0, 0, sd, partCols, 
null, null, null, null);
+    Table table1 = new Table(tableName2, dbName, null, 0, 0, 0, sd, partCols, 
null, null, null,
+        null);
     notificationEvent = new NotificationEvent(inputEventId, 0,
-      HCatEventMessage.EventType.CREATE_TABLE.toString(),
-      messageFactory.buildCreateTableMessage(table1).toString());
+        HCatEventMessage.EventType.CREATE_TABLE.toString(),
+        messageFactory.buildCreateTableMessage(table1).toString());
     notificationEvent.setDbName(dbName);
     notificationEvent.setTableName(tableName2);
     events.add(notificationEvent);
     // Process the notification
-    hmsFollower.processNotificationEvents(events);
+    hmsFollower.processNotifications(events);
     // Make sure that addAuthzPathsMapping was invoked once to handle 
CREATE_TABLE notification
     // and persistLastProcessedNotificationID was not invoked.
+    //noinspection unchecked
     verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(),
-      Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
+        Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
     verify(sentryStore, 
times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
   }
+
+  /**
+   * Constructs a two events and passed to processor of hms follower. First 
one being create table
+   * event with location information(Invalid Event). Idea is to make sure that 
hms follower calls
+   * appropriate sentry store API's for the event processed by hms follower 
after processing the
+   * invalid create table event.
+   *
+   * @throws Exception
+   */
+  public void testCreateTableAfterInvalidEvent() throws Exception {
+    String dbName = "db1";
+    String tableName = "table1";
+    long inputEventId = 1;
+
+    
Mockito.doNothing().when(sentryStore).persistLastProcessedNotificationID(Mockito.anyLong());
+    //noinspection unchecked
+    Mockito.doNothing().when(sentryStore)
+        .addAuthzPathsMapping(Mockito.anyString(), Mockito.anyCollection(),
+            Mockito.any(Updateable.Update.class));
+
+    // Create invalid notification event. The location of the storage 
descriptor is null, which is invalid for creating table
+    StorageDescriptor invalidSd = new StorageDescriptor();
+    invalidSd.setLocation(null);
+    NotificationEvent invalidNotificationEvent = new 
NotificationEvent(inputEventId, 0,
+        HCatEventMessage.EventType.CREATE_TABLE.toString(),
+        messageFactory.buildCreateTableMessage(
+            new Table(tableName, dbName, null, 0, 0, 0, invalidSd, null, null, 
null, null, null))
+            .toString());
+
+    // Create valid notification event
+    StorageDescriptor sd = new StorageDescriptor();
+    sd.setLocation("hdfs://db1.db/table1");
+    inputEventId += 1;
+    NotificationEvent notificationEvent = new NotificationEvent(inputEventId, 
0,
+        HCatEventMessage.EventType.CREATE_TABLE.toString(),
+        messageFactory.buildCreateTableMessage(
+            new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, 
null, null))
+            .toString());
+    List<NotificationEvent> events = new ArrayList<>();
+    events.add(invalidNotificationEvent);
+    events.add(notificationEvent);
+
+    Configuration configuration = new Configuration();
+    HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
+        hiveConnectionFactory, hiveInstance);
+    hmsFollower.processNotifications(events);
+
+    // invalid event updates notification ID directly
+    verify(sentryStore, 
times(1)).persistLastProcessedNotificationID(inputEventId - 1);
+
+    // next valid event update path, which updates notification ID
+    //noinspection unchecked
+    verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), 
Mockito.anyCollection(),
+        Mockito.any(Updateable.Update.class));
+  }
 }

Reply via email to