http://git-wip-us.apache.org/repos/asf/sentry/blob/e5bb466e/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java index 6762de7..a49d8c6 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java @@ -15,82 +15,275 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.sentry.service.thrift; +import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_CREATE_WITH_POLICY_STORE; +import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_DROP_WITH_POLICY_STORE; + +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hive.hcatalog.messaging.HCatEventMessage; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAddPartitionMessage; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterPartitionMessage; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterTableMessage; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateDatabaseMessage; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateTableMessage; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropDatabaseMessage; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropPartitionMessage; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropTableMessage; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageDeserializer; +import org.apache.sentry.core.common.exception.SentryInvalidHMSEventException; +import org.apache.sentry.core.common.exception.SentryInvalidInputException; +import org.apache.sentry.core.common.exception.SentryNoSuchObjectException; import org.apache.sentry.hdfs.PathsUpdate; +import org.apache.sentry.hdfs.PermissionsUpdate; import org.apache.sentry.hdfs.SentryMalformedPathException; -import org.apache.sentry.core.common.exception.SentryInvalidHMSEventException; +import org.apache.sentry.hdfs.Updateable.Update; +import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges; import org.apache.sentry.provider.db.service.persistent.SentryStore; +import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; /** * NotificationProcessor processes various notification events generated from - * the Hive MetaStore state change, and applies these changes on the complete + * the Hive MetaStore state change, and applies these changes to the complete * HMS Paths snapshot or delta update stored in Sentry using SentryStore. - * <p> - * NotificationProcessor should not skip processing notification events for any reason. + * + * <p>NotificationProcessor should not skip processing notification events for any reason. * If some notification events are to be skipped, appropriate logic should be added in - * HMSFollower before invoking NotificationProcessor. + * HmsFollower before invoking NotificationProcessor. */ -class NotificationProcessor { +final class NotificationProcessor { - private final Logger LOGGER; + private static final Logger LOGGER = LoggerFactory.getLogger(NotificationProcessor.class); private final SentryStore sentryStore; + private final SentryJSONMessageDeserializer deserializer; + private final String authServerName; + // These variables can be updated even after object is instantiated, for testing purposes. + private boolean syncStoreOnCreate = false; + private boolean syncStoreOnDrop = false; - NotificationProcessor(SentryStore sentryStore, Logger LOGGER) { - this.LOGGER = LOGGER; + /** + * Configuring notification processor. + * + * @param sentryStore sentry backend store + * @param authServerName Server that sentry is authorizing + * @param conf sentry configuration + */ + NotificationProcessor(SentryStore sentryStore, String authServerName, + Configuration conf) { this.sentryStore = sentryStore; + deserializer = new SentryJSONMessageDeserializer(); + this.authServerName = authServerName; + syncStoreOnCreate = Boolean + .parseBoolean(conf.get(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE.getVar(), + AUTHZ_SYNC_CREATE_WITH_POLICY_STORE.getDefault())); + syncStoreOnDrop = Boolean.parseBoolean(conf.get(AUTHZ_SYNC_DROP_WITH_POLICY_STORE.getVar(), + AUTHZ_SYNC_DROP_WITH_POLICY_STORE.getDefault())); + } + + /** + * Split path into components on the "/" character. + * The path should not start with "/". + * This is consumed by Thrift interface, so the return result should be + * {@code List<String>} + * + * @param path input oath e.g. {@code foo/bar} + * @return list of components, e.g. [foo, bar] + */ + private static List<String> splitPath(String path) { + return (Lists.newArrayList(path.split("/"))); + } + + /** + * Constructs permission update to be persisted for drop event that can be persisted + * from thrift object. + * + * @param authorizable thrift object that is dropped. + * @return update to be persisted + * @throws SentryInvalidInputException if the required fields are set in argument provided + */ + @VisibleForTesting + static Update getPermUpdatableOnDrop(TSentryAuthorizable authorizable) + throws SentryInvalidInputException { + PermissionsUpdate update = new PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false); + String authzObj = SentryServiceUtil.getAuthzObj(authorizable); + update.addPrivilegeUpdate(authzObj) + .putToDelPrivileges(PermissionsUpdate.ALL_ROLES, PermissionsUpdate.ALL_ROLES); + return update; + } + + /** + * Constructs permission update to be persisted for rename event that can be persisted from thrift + * object. + * + * @param oldAuthorizable old thrift object + * @param newAuthorizable new thrift object + * @return update to be persisted + * @throws SentryInvalidInputException if the required fields are set in arguments provided + */ + @VisibleForTesting + static Update getPermUpdatableOnRename(TSentryAuthorizable oldAuthorizable, + TSentryAuthorizable newAuthorizable) + throws SentryInvalidInputException { + String oldAuthz = SentryServiceUtil.getAuthzObj(oldAuthorizable); + String newAuthz = SentryServiceUtil.getAuthzObj(newAuthorizable); + PermissionsUpdate update = new PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false); + TPrivilegeChanges privUpdate = update.addPrivilegeUpdate(PermissionsUpdate.RENAME_PRIVS); + privUpdate.putToAddPrivileges(newAuthz, newAuthz); + privUpdate.putToDelPrivileges(oldAuthz, oldAuthz); + return update; + } + + /** + * This function is only used for testing purposes. + * + * @param value to be set + */ + @SuppressWarnings("SameParameterValue") + @VisibleForTesting + void setSyncStoreOnCreate(boolean value) { + syncStoreOnCreate = value; + } + + /** + * This function is only used for testing purposes. + * + * @param value to be set + */ + @SuppressWarnings("SameParameterValue") + @VisibleForTesting + void setSyncStoreOnDrop(boolean value) { + syncStoreOnDrop = value; + } + + /** + * Processes the event and persist to sentry store. + * + * @param event to be processed + * @return true, if the event is persisted to sentry store. false, if the event is not persisted. + * @throws Exception if there is an error processing the event. + */ + boolean processNotificationEvent(NotificationEvent event) throws Exception { + LOGGER + .debug("Processing event with id:{} and Type:{}", event.getEventId(), event.getEventType()); + switch (HCatEventMessage.EventType.valueOf(event.getEventType())) { + case CREATE_DATABASE: + return processCreateDatabase(event); + case DROP_DATABASE: + return processDropDatabase(event); + case CREATE_TABLE: + return processCreateTable(event); + case DROP_TABLE: + return processDropTable(event); + case ALTER_TABLE: + return processAlterTable(event); + case ADD_PARTITION: + return processAddPartition(event); + case DROP_PARTITION: + return processDropPartition(event); + case ALTER_PARTITION: + return processAlterPartition(event); + case INSERT: + return false; + default: + LOGGER.error("Notification with ID:{} has invalid event type: {}", event.getEventId(), + event.getEventType()); + return false; + } } /** * Processes "create database" notification event, and applies its corresponding * snapshot change as well as delta path update into Sentry DB. * - * @param dbName database name - * @param location database location - * @param seqNum notification event ID + * @param event notification event to be processed. * @throws Exception if encounters errors while persisting the path change */ - void processCreateDatabase(String dbName, String location, long seqNum) throws Exception { + private boolean processCreateDatabase(NotificationEvent event) throws Exception { + SentryJSONCreateDatabaseMessage message = + deserializer.getCreateDatabaseMessage(event.getMessage()); + String dbName = message.getDB(); + String location = message.getLocation(); + if ((dbName == null) || (location == null)) { + LOGGER.error("Create database event " + + "has incomplete information. dbName: {} location: {}", + StringUtils.defaultIfBlank(dbName, "null"), + StringUtils.defaultIfBlank(location, "null")); + return false; + } List<String> locations = Collections.singletonList(location); - addPaths(dbName, locations, seqNum); + addPaths(dbName, locations, event.getEventId()); + if (syncStoreOnCreate) { + dropSentryDbPrivileges(dbName, event); + } + return true; } /** * Processes "drop database" notification event, and applies its corresponding * snapshot change as well as delta path update into Sentry DB. * - * @param dbName database name - * @param location database location - * @param seqNum notification event ID + * @param event notification event to be processed. * @throws Exception if encounters errors while persisting the path change */ - void processDropDatabase(String dbName, String location, long seqNum) throws Exception { + private boolean processDropDatabase(NotificationEvent event) throws Exception { + SentryJSONDropDatabaseMessage dropDatabaseMessage = + deserializer.getDropDatabaseMessage(event.getMessage()); + String dbName = dropDatabaseMessage.getDB(); + String location = dropDatabaseMessage.getLocation(); + if (dbName == null) { + LOGGER.error("Drop database event has incomplete information: dbName = null"); + return false; + } + if (syncStoreOnDrop) { + dropSentryDbPrivileges(dbName, event); + } List<String> locations = Collections.singletonList(location); - removePaths(dbName, locations, seqNum); + removePaths(dbName, locations, event.getEventId()); + return true; } /** * Processes "create table" notification event, and applies its corresponding * snapshot change as well as delta path update into Sentry DB. * - * @param dbName database name - * @param tableName table name - * @param location table location - * @param seqNum notification event ID + * @param event notification event to be processed. * @throws Exception if encounters errors while persisting the path change */ - void processCreateTable(String dbName, String tableName, String location, long seqNum) - throws Exception { - String authzObj = dbName + "." + tableName; + private boolean processCreateTable(NotificationEvent event) + throws Exception { + SentryJSONCreateTableMessage createTableMessage = deserializer + .getCreateTableMessage(event.getMessage()); + String dbName = createTableMessage.getDB(); + String tableName = createTableMessage.getTable(); + String location = createTableMessage.getLocation(); + if ((dbName == null) || (tableName == null) || (location == null)) { + LOGGER.error(String.format("Create table event " + "has incomplete information." + + " dbName = %s, tableName = %s, location = %s", + StringUtils.defaultIfBlank(dbName, "null"), + StringUtils.defaultIfBlank(tableName, "null"), + StringUtils.defaultIfBlank(location, "null"))); + return false; + } + if (syncStoreOnCreate) { + dropSentryTablePrivileges(dbName, tableName, event); + } + String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName); List<String> locations = Collections.singletonList(location); - addPaths(authzObj, locations, seqNum); + addPaths(authzObj, locations, event.getEventId()); + return true; } /** @@ -98,86 +291,185 @@ class NotificationProcessor { * the table as well. And applies its corresponding snapshot change as well * as delta path update into Sentry DB. * - * @param dbName database name - * @param tableName table name - * @param seqNum notification event ID + * @param event notification event to be processed. * @throws Exception if encounters errors while persisting the path change */ - void processDropTable(String dbName, String tableName, long seqNum) throws Exception { - String authzObj = dbName + "." + tableName; - removeAllPaths(authzObj, seqNum); + private boolean processDropTable(NotificationEvent event) throws Exception { + SentryJSONDropTableMessage dropTableMessage = deserializer + .getDropTableMessage(event.getMessage()); + String dbName = dropTableMessage.getDB(); + String tableName = dropTableMessage.getTable(); + if ((dbName == null) || (tableName == null)) { + LOGGER.error("Drop table event " + + "has incomplete information. dbName: {}, tableName: {}", + StringUtils.defaultIfBlank(dbName, "null"), + StringUtils.defaultIfBlank(tableName, "null")); + return false; + } + if (syncStoreOnDrop) { + dropSentryTablePrivileges(dbName, tableName, event); + } + String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName); + removeAllPaths(authzObj, event.getEventId()); + return true; } /** * Processes "alter table" notification event, and applies its corresponding * snapshot change as well as delta path update into Sentry DB. * - * @param oldDbName old database name - * @param newDbName new database name - * @param oldTableName old table name - * @param newTableName new table name - * @param oldLocation old table location - * @param newLocation new table location - * @param seqNum notification event ID + * @param event notification event to be processed. * @throws Exception if encounters errors while persisting the path change */ - void processAlterTable(String oldDbName, String newDbName, String oldTableName, - String newTableName, String oldLocation, String newLocation, long seqNum) - throws Exception { + private boolean processAlterTable(NotificationEvent event) throws Exception { + SentryJSONAlterTableMessage alterTableMessage = + deserializer.getAlterTableMessage(event.getMessage()); + String oldDbName = alterTableMessage.getDB(); + String oldTableName = alterTableMessage.getTable(); + String newDbName = event.getDbName(); + String newTableName = event.getTableName(); + String oldLocation = alterTableMessage.getOldLocation(); + String newLocation = alterTableMessage.getNewLocation(); + + if ((oldDbName == null) + || (oldTableName == null) + || (newDbName == null) + || (newTableName == null) + || (oldLocation == null) + || (newLocation == null)) { + LOGGER.error(String.format("Alter table event " + + "has incomplete information. oldDbName = %s, oldTableName = %s, oldLocation = %s, " + + "newDbName = %s, newTableName = %s, newLocation = %s", + StringUtils.defaultIfBlank(oldDbName, "null"), + StringUtils.defaultIfBlank(oldTableName, "null"), + StringUtils.defaultIfBlank(oldLocation, "null"), + StringUtils.defaultIfBlank(newDbName, "null"), + StringUtils.defaultIfBlank(newTableName, "null"), + StringUtils.defaultIfBlank(newLocation, "null"))); + return false; + } + + if ((oldDbName.equals(newDbName)) + && (oldTableName.equals(newTableName)) + && (oldLocation.equals(newLocation))) { + LOGGER.error(String.format("Alter table notification ignored as neither name nor " + + "location has changed: oldAuthzObj = %s, oldLocation = %s, newAuthzObj = %s, " + + "newLocation = %s", oldDbName + "." + oldTableName, oldLocation, + newDbName + "." + newTableName, newLocation)); + return false; + } + + if (!newDbName.equalsIgnoreCase(oldDbName) || !oldTableName.equalsIgnoreCase(newTableName)) { + // Name has changed + try { + renamePrivileges(oldDbName, oldTableName, newDbName, newTableName); + } catch (SentryNoSuchObjectException e) { + LOGGER.info("Rename Sentry privilege ignored as there are no privileges on the table:" + + " {}.{}", oldDbName, oldTableName); + } catch (Exception e) { + LOGGER.info("Could not process Alter table event. Event: {}", event.toString(), e); + return false; + } + } String oldAuthzObj = oldDbName + "." + oldTableName; String newAuthzObj = newDbName + "." + newTableName; - renameAuthzPath(oldAuthzObj, newAuthzObj, oldLocation, newLocation, seqNum); + renameAuthzPath(oldAuthzObj, newAuthzObj, oldLocation, newLocation, event.getEventId()); + return true; } /** * Processes "add partition" notification event, and applies its corresponding * snapshot change as well as delta path update into Sentry DB. * - * @param dbName database name - * @param tableName table name - * @param locations partition locations - * @param seqNum notification event ID + * @param event notification event to be processed. * @throws Exception if encounters errors while persisting the path change */ - void processAddPartition(String dbName, String tableName, - Collection<String> locations, long seqNum) - throws Exception { - String authzObj = dbName + "." + tableName; - addPaths(authzObj, locations, seqNum); + private boolean processAddPartition(NotificationEvent event) + throws Exception { + SentryJSONAddPartitionMessage addPartitionMessage = + deserializer.getAddPartitionMessage(event.getMessage()); + String dbName = addPartitionMessage.getDB(); + String tableName = addPartitionMessage.getTable(); + List<String> locations = addPartitionMessage.getLocations(); + if ((dbName == null) || (tableName == null) || (locations == null)) { + LOGGER.error(String.format("Create table event has incomplete information. " + + "dbName = %s, tableName = %s, locations = %s", + StringUtils.defaultIfBlank(dbName, "null"), + StringUtils.defaultIfBlank(tableName, "null"), + locations != null ? locations.toString() : "null")); + return false; + } + String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName); + addPaths(authzObj, locations, event.getEventId()); + return true; } /** * Processes "drop partition" notification event, and applies its corresponding * snapshot change as well as delta path update into Sentry DB. * - * @param dbName database name - * @param tableName table name - * @param locations partition locations - * @param seqNum notification event ID + * @param event notification event to be processed. * @throws Exception if encounters errors while persisting the path change */ - void processDropPartition(String dbName, String tableName, - Collection<String> locations, long seqNum) - throws Exception { - String authzObj = dbName + "." + tableName; - removePaths(authzObj, locations, seqNum); + private boolean processDropPartition(NotificationEvent event) + throws Exception { + SentryJSONDropPartitionMessage dropPartitionMessage = + deserializer.getDropPartitionMessage(event.getMessage()); + String dbName = dropPartitionMessage.getDB(); + String tableName = dropPartitionMessage.getTable(); + List<String> locations = dropPartitionMessage.getLocations(); + if ((dbName == null) || (tableName == null) || (locations == null)) { + LOGGER.error(String.format("Drop partition event " + + "has incomplete information. dbName = %s, tableName = %s, location = %s", + StringUtils.defaultIfBlank(dbName, "null"), + StringUtils.defaultIfBlank(tableName, "null"), + locations != null ? locations.toString() : "null")); + return false; + } + String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName); + removePaths(authzObj, locations, event.getEventId()); + return true; } /** * Processes "alter partition" notification event, and applies its corresponding * snapshot change as well as delta path update into Sentry DB. * - * @param dbName database name - * @param tableName table name - * @param oldLocation old partition location - * @param newLocation new partition location - * @param seqNum notification event ID + * @param event notification event to be processed. * @throws Exception if encounters errors while persisting the path change */ - void processAlterPartition(String dbName, String tableName, String oldLocation, - String newLocation, long seqNum) throws Exception { + private boolean processAlterPartition(NotificationEvent event) throws Exception { + SentryJSONAlterPartitionMessage alterPartitionMessage = + deserializer.getAlterPartitionMessage(event.getMessage()); + String dbName = alterPartitionMessage.getDB(); + String tableName = alterPartitionMessage.getTable(); + String oldLocation = alterPartitionMessage.getOldLocation(); + String newLocation = alterPartitionMessage.getNewLocation(); + + if ((dbName == null) + || (tableName == null) + || (oldLocation == null) + || (newLocation == null)) { + LOGGER.error(String.format("Alter partition event " + + "has incomplete information. dbName = %s, tableName = %s, " + + "oldLocation = %s, newLocation = %s", + StringUtils.defaultIfBlank(dbName, "null"), + StringUtils.defaultIfBlank(tableName, "null"), + StringUtils.defaultIfBlank(oldLocation, "null"), + StringUtils.defaultIfBlank(newLocation, "null"))); + return false; + } + + if (oldLocation.equals(newLocation)) { + LOGGER.info(String.format("Alter partition notification ignored as" + + "location has not changed: AuthzObj = %s, Location = %s", dbName + "." + + "." + tableName, oldLocation)); + return false; + } + String oldAuthzObj = dbName + "." + tableName; - renameAuthzPath(oldAuthzObj, oldAuthzObj, oldLocation, newLocation, seqNum); + renameAuthzPath(oldAuthzObj, oldAuthzObj, oldLocation, newLocation, event.getEventId()); + return true; } /** @@ -187,10 +479,9 @@ class NotificationProcessor { * @param authzObj the given authzObj * @param locations a set of paths need to be added * @param seqNum notification event ID - * @throws Exception */ private void addPaths(String authzObj, Collection<String> locations, long seqNum) - throws Exception { + throws Exception { // AuthzObj is case insensitive authzObj = authzObj.toLowerCase(); @@ -201,13 +492,13 @@ class NotificationProcessor { for (String location : locations) { String pathTree = getPath(location); if (pathTree == null) { - LOGGER.debug("#### HMS Path Update [" + LOGGER.debug("HMS Path Update [" + "OP : addPath, " + "authzObj : " + authzObj + ", " + "path : " + location + "] - nothing to add" + ", " + "notification event ID: " + seqNum + "]"); } else { - LOGGER.debug("#### HMS Path Update [" + LOGGER.debug("HMS Path Update [" + "OP : addPath, " + "authzObj : " + authzObj + ", " + "path : " + location + ", " @@ -226,10 +517,9 @@ class NotificationProcessor { * @param authzObj the given authzObj * @param locations a set of paths need to be removed * @param seqNum notification event ID - * @throws Exception */ private void removePaths(String authzObj, Collection<String> locations, long seqNum) - throws Exception { + throws Exception { // AuthzObj is case insensitive authzObj = authzObj.toLowerCase(); @@ -238,13 +528,13 @@ class NotificationProcessor { for (String location : locations) { String pathTree = getPath(location); if (pathTree == null) { - LOGGER.debug("#### HMS Path Update [" + LOGGER.debug("HMS Path Update [" + "OP : removePath, " + "authzObj : " + authzObj + ", " + "path : " + location + "] - nothing to remove" + ", " + "notification event ID: " + seqNum + "]"); } else { - LOGGER.debug("#### HMS Path Update [" + LOGGER.debug("HMS Path Update [" + "OP : removePath, " + "authzObj : " + authzObj + ", " + "path : " + location + ", " @@ -263,14 +553,13 @@ class NotificationProcessor { * * @param authzObj the given authzObj to be deleted * @param seqNum notification event ID - * @throws Exception */ private void removeAllPaths(String authzObj, long seqNum) - throws Exception { + throws Exception { // AuthzObj is case insensitive authzObj = authzObj.toLowerCase(); - LOGGER.debug("#### HMS Path Update [" + LOGGER.debug("HMS Path Update [" + "OP : removeAllPaths, " + "authzObj : " + authzObj + ", " + "notification event ID: " + seqNum + "]"); @@ -289,21 +578,19 @@ class NotificationProcessor { * @param newAuthzObj the new name to be changed to * @param oldLocation a existing path of the given authzObj * @param newLocation a new path to be changed to - * @param seqNum - * @throws Exception */ private void renameAuthzPath(String oldAuthzObj, String newAuthzObj, String oldLocation, - String newLocation, long seqNum) throws Exception { + String newLocation, long seqNum) throws Exception { // AuthzObj is case insensitive oldAuthzObj = oldAuthzObj.toLowerCase(); newAuthzObj = newAuthzObj.toLowerCase(); String oldPathTree = getPath(oldLocation); String newPathTree = getPath(newLocation); - LOGGER.debug("#### HMS Path Update [" + LOGGER.debug("HMS Path Update [" + "OP : renameAuthzObject, " + "oldAuthzObj : " + oldAuthzObj + ", " - + "newAuthzObj : " + newAuthzObj + ", " + + "newAuthzObj : " + newAuthzObj + ", " + "oldLocation : " + oldLocation + ", " + "newLocation : " + newLocation + ", " + "notification event ID: " + seqNum + "]"); @@ -323,20 +610,10 @@ class NotificationProcessor { // Both name and location has changed // - Alter table rename for managed table sentryStore.renameAuthzPathsMapping(oldAuthzObj, newAuthzObj, oldPathTree, - newPathTree, update); + newPathTree, update); } - } else if (oldPathTree != null) { - PathsUpdate update = new PathsUpdate(seqNum, false); - update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree)); - sentryStore.deleteAuthzPathsMapping(oldAuthzObj, - Collections.singleton(oldPathTree), - update); - } else if (newPathTree != null) { - PathsUpdate update = new PathsUpdate(seqNum, false); - update.newPathChange(newAuthzObj).addToAddPaths(splitPath(newPathTree)); - sentryStore.addAuthzPathsMapping(newAuthzObj, - Collections.singleton(newPathTree), - update); + } else { + updateAuthzPathsMapping(oldAuthzObj, oldPathTree, newAuthzObj, newPathTree, seqNum); } } else if (!oldLocation.equals(newLocation)) { // Only Location has changed, e.g. Alter table set location @@ -346,27 +623,35 @@ class NotificationProcessor { update.newPathChange(oldAuthzObj).addToAddPaths(splitPath(newPathTree)); sentryStore.updateAuthzPathsMapping(oldAuthzObj, oldPathTree, newPathTree, update); - } else if (oldPathTree != null) { - PathsUpdate update = new PathsUpdate(seqNum, false); - update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree)); - sentryStore.deleteAuthzPathsMapping(oldAuthzObj, - Collections.singleton(oldPathTree), - update); - } else if (newPathTree != null) { - PathsUpdate update = new PathsUpdate(seqNum, false); - update.newPathChange(oldAuthzObj).addToAddPaths(splitPath(newPathTree)); - sentryStore.addAuthzPathsMapping(oldAuthzObj, - Collections.singleton(newPathTree), - update); + } else { + updateAuthzPathsMapping(oldAuthzObj, oldPathTree, newAuthzObj, newPathTree, seqNum); } } else { LOGGER.error("Update Notification for Auhorizable object {}, with no change, skipping", - oldAuthzObj); - throw new SentryInvalidHMSEventException("Update Notification for Authorizable object" + - "with no change"); + oldAuthzObj); + throw new SentryInvalidHMSEventException("Update Notification for Authorizable object" + + "with no change"); } } + private void updateAuthzPathsMapping(String oldAuthzObj, String oldPathTree, + String newAuthzObj, String newPathTree, long seqNum) throws Exception { + if (oldPathTree != null) { + PathsUpdate update = new PathsUpdate(seqNum, false); + update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree)); + sentryStore.deleteAuthzPathsMapping(oldAuthzObj, + Collections.singleton(oldPathTree), + update); + } else if (newPathTree != null) { + PathsUpdate update = new PathsUpdate(seqNum, false); + update.newPathChange(newAuthzObj).addToAddPaths(splitPath(newPathTree)); + sentryStore.addAuthzPathsMapping(newAuthzObj, + Collections.singleton(newPathTree), + update); + } + + } + /** * Get path tree from a given path. It return null if encounters * SentryMalformedPathException which indicates a malformed path. @@ -383,15 +668,45 @@ class NotificationProcessor { return null; } - /** - * Split path into components on the "/" character. - * The path should not start with "/". - * This is consumed by Thrift interface, so the return result should be - * {@code List<String>} - * @param path input oath e.g. {@code foo/bar} - * @return list of commponents, e.g. [foo, bar] - */ - private List<String> splitPath(String path) { - return (Lists.newArrayList(path.split("/"))); + private void dropSentryDbPrivileges(String dbName, NotificationEvent event) { + try { + TSentryAuthorizable authorizable = new TSentryAuthorizable(authServerName); + authorizable.setDb(dbName); + sentryStore.dropPrivilege(authorizable, getPermUpdatableOnDrop(authorizable)); + } catch (SentryNoSuchObjectException e) { + LOGGER.info("Drop Sentry privilege ignored as there are no privileges on the database: {}", + dbName); + } catch (Exception e) { + LOGGER.error("Could not process Drop database event." + "Event: " + event.toString(), e); + } + } + + private void dropSentryTablePrivileges(String dbName, String tableName, + NotificationEvent event) { + try { + TSentryAuthorizable authorizable = new TSentryAuthorizable(authServerName); + authorizable.setDb(dbName); + authorizable.setTable(tableName); + sentryStore.dropPrivilege(authorizable, getPermUpdatableOnDrop(authorizable)); + } catch (SentryNoSuchObjectException e) { + LOGGER.info("Drop Sentry privilege ignored as there are no privileges on the table: {}.{}", + dbName, tableName); + } catch (Exception e) { + LOGGER.error("Could not process Drop table event. Event: " + event.toString(), e); + } + } + + private void renamePrivileges(String oldDbName, String oldTableName, String newDbName, + String newTableName) throws + Exception { + TSentryAuthorizable oldAuthorizable = new TSentryAuthorizable(authServerName); + oldAuthorizable.setDb(oldDbName); + oldAuthorizable.setTable(oldTableName); + TSentryAuthorizable newAuthorizable = new TSentryAuthorizable(authServerName); + newAuthorizable.setDb(newDbName); + newAuthorizable.setTable(newTableName); + Update update = + getPermUpdatableOnRename(oldAuthorizable, newAuthorizable); + sentryStore.renamePrivilege(oldAuthorizable, newAuthorizable, update); } }
http://git-wip-us.apache.org/repos/asf/sentry/blob/e5bb466e/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHmsClient.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHmsClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHmsClient.java new file mode 100644 index 0000000..29a85d7 --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHmsClient.java @@ -0,0 +1,244 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + <p> + http://www.apache.org/licenses/LICENSE-2.0 + <p> + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package org.apache.sentry.service.thrift; + +import static com.codahale.metrics.MetricRegistry.name; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Timer; +import com.codahale.metrics.Timer.Context; + +import com.google.common.annotations.VisibleForTesting; + +import java.io.IOException; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.sentry.provider.db.service.persistent.PathsImage; +import org.apache.sentry.provider.db.service.persistent.SentryStore; +import org.apache.sentry.provider.db.service.thrift.SentryMetrics; + +import org.apache.thrift.TException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Wrapper class for <Code>HiveMetaStoreClient</Code> + * + * <p>Abstracts communication with HMS and exposes APi's to connect/disconnect to HMS and to + * request HMS snapshots and also for new notifications. + */ +final class SentryHmsClient implements AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(SentryHmsClient.class); + private final Configuration conf; + private HiveMetaStoreClient client = null; + private HiveConnectionFactory hiveConnectionFactory; + + private static final String SNAPSHOT = "snapshot"; + /** Measures time to get full snapshot. */ + private final Timer updateTimer = SentryMetrics.getInstance() + .getTimer(name(FullUpdateInitializer.class, SNAPSHOT)); + /** Number of times update failed. */ + private final Counter failedSnapshotsCount = SentryMetrics.getInstance() + .getCounter(name(FullUpdateInitializer.class, "failed")); + + SentryHmsClient(Configuration conf, HiveConnectionFactory hiveConnectionFactory) { + this.conf = conf; + this.hiveConnectionFactory = hiveConnectionFactory; + } + + /** + * Used only for testing purposes. + *x + * @param client HiveMetaStoreClient to be initialized + */ + @VisibleForTesting + void setClient(HiveMetaStoreClient client) { + this.client = client; + } + + /** + * Used to know if the client is connected to HMS + * + * @return true if the client is connected to HMS false, if client is not connected. + */ + boolean isConnected() { + return client != null; + } + + /** + * Connects to HMS by creating HiveMetaStoreClient. + * + * @throws IOException if could not establish connection + * @throws InterruptedException if connection was interrupted + * @throws MetaException if other errors happened + */ + void connect() + throws IOException, InterruptedException, MetaException { + if (client != null) { + return; + } + client = hiveConnectionFactory.connect().getClient(); + } + + /** + * Disconnects the HMS client. + */ + public void disconnect() throws Exception { + try { + if (client != null) { + LOGGER.info("Closing the HMS client connection"); + client.close(); + } + } catch (Exception e) { + LOGGER.error("failed to close Hive Connection Factory", e); + } finally { + client = null; + } + } + + /** + * Closes the HMS client. + * + * <p>This is similar to disconnect. As this class implements AutoClosable, close should be + * implemented. + */ + public void close() throws Exception { + disconnect(); + } + + /** + * Creates HMS full snapshot. + * + * @return Full path snapshot and the last notification id on success + */ + PathsImage getFullSnapshot() { + try { + if (client == null) { + LOGGER.error("Client is not connected to HMS"); + return new PathsImage(Collections.<String, Set<String>>emptyMap(), + SentryStore.EMPTY_NOTIFICATION_ID, SentryStore.EMPTY_PATHS_SNAPSHOT_ID); + } + + CurrentNotificationEventId eventIdBefore = client.getCurrentNotificationEventId(); + Map<String, Set<String>> pathsFullSnapshot = fetchFullUpdate(); + if (pathsFullSnapshot.isEmpty()) { + return new PathsImage(pathsFullSnapshot, SentryStore.EMPTY_NOTIFICATION_ID, + SentryStore.EMPTY_PATHS_SNAPSHOT_ID); + } + + CurrentNotificationEventId eventIdAfter = client.getCurrentNotificationEventId(); + LOGGER.info("NotificationID, Before Snapshot: {}, After Snapshot {}", + eventIdBefore.getEventId(), eventIdAfter.getEventId()); + // To ensure point-in-time snapshot consistency, need to make sure + // there were no HMS updates while retrieving the snapshot. If there are updates, snapshot + // is discarded. New attempt will be made after 500 milliseconds when + // HmsFollower runs again. + if (!eventIdBefore.equals(eventIdAfter)) { + LOGGER.error("Snapshot discarded, updates to HMS data while shapshot is being taken." + + "ID Before: {}. ID After: {}", eventIdBefore.getEventId(), eventIdAfter.getEventId()); + return new PathsImage(Collections.<String, Set<String>>emptyMap(), + SentryStore.EMPTY_NOTIFICATION_ID, SentryStore.EMPTY_PATHS_SNAPSHOT_ID); + } + + LOGGER.info("Successfully fetched hive full snapshot, Current NotificationID: {}.", + eventIdAfter); + // As eventIDAfter is the last event that was processed, eventIDAfter is used to update + // lastProcessedNotificationID instead of getting it from persistent store. + return new PathsImage(pathsFullSnapshot, eventIdAfter.getEventId(), + SentryStore.EMPTY_PATHS_SNAPSHOT_ID); + } catch (TException failure) { + LOGGER.error("Failed to communicate to HMS"); + return new PathsImage(Collections.<String, Set<String>>emptyMap(), + SentryStore.EMPTY_NOTIFICATION_ID, SentryStore.EMPTY_PATHS_SNAPSHOT_ID); + } + } + + /** + * Retrieve a Hive full snapshot from HMS. + * + * @return HMS snapshot. Snapshot consists of a mapping from auth object name to the set of paths + * corresponding to that name. + */ + private Map<String, Set<String>> fetchFullUpdate() { + LOGGER.info("Request full HMS snapshot"); + try (FullUpdateInitializer updateInitializer = + new FullUpdateInitializer(hiveConnectionFactory, conf); + Context context = updateTimer.time()) { + Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot(); + LOGGER.info("Obtained full HMS snapshot"); + return pathsUpdate; + } catch (Exception ignored) { + failedSnapshotsCount.inc(); + LOGGER.error("Snapshot created failed ", ignored); + return Collections.emptyMap(); + } + } + + /** + * Returns all HMS notifications with ID greater than the specified one + * + * @param notificationId ID of the last notification that was processed. + * @return Collection of new events to be synced + */ + Collection<NotificationEvent> getNotifications(long notificationId) throws Exception { + if (client == null) { + LOGGER.error("Client is not connected to HMS"); + return Collections.emptyList(); + } + LOGGER.debug("Checking for notifications beyond {}", notificationId); + // HIVE-15761: Currently getNextNotification API may return an empty + // NotificationEventResponse causing TProtocolException. + // Workaround: Only processes the notification events newer than the last updated one. + CurrentNotificationEventId eventId = client.getCurrentNotificationEventId(); + LOGGER.debug("ID of Last HMS notifications is: {}", eventId.getEventId()); + if (eventId.getEventId() < notificationId) { + LOGGER.error("Last notification of HMS is smaller than what sentry processed, Something is" + + "wrong. Sentry will request a full Snapshot"); + // TODO Path Mapping info should be cleared so that HmsFollower would request for full + // snapshot in the subsequent run. + return Collections.emptyList(); + } + + if (eventId.getEventId() == notificationId) { + return Collections.emptyList(); + } + + NotificationEventResponse response = + client.getNextNotification(notificationId, Integer.MAX_VALUE, null); + if (response.isSetEvents()) { + LOGGER.debug("Last Id processed:{}. Received collection of notifications, Size:{}", + notificationId, response.getEvents().size()); + return response.getEvents(); + } + + return Collections.emptyList(); + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/e5bb466e/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java index 6014a79..adb2030 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java @@ -98,7 +98,7 @@ public class SentryService implements Callable, SigUtils.SigListener { private final String keytab; private final ExecutorService serviceExecutor; private ScheduledExecutorService hmsFollowerExecutor = null; - private HMSFollower hmsFollower = null; + private HmsFollower hmsFollower = null; private Future serviceStatus; private TServer thriftServer; private Status status; @@ -108,7 +108,7 @@ public class SentryService implements Callable, SigUtils.SigListener { /* sentryStore provides the data access for sentry data. It is the singleton instance shared between various {@link SentryPolicyService}, i.e., {@link SentryPolicyStoreProcessor} and - {@link HMSFollower}. + {@link HmsFollower}. */ private final SentryStore sentryStore; private ScheduledExecutorService sentryStoreCleanService; @@ -283,11 +283,11 @@ public class SentryService implements Callable, SigUtils.SigListener { String metastoreURI = SentryServiceUtil.getHiveMetastoreURI(); if (metastoreURI == null) { - LOGGER.info("Metastore uri is not configured. Do not start HMSFollower"); + LOGGER.info("Metastore uri is not configured. Do not start HmsFollower"); return; } - LOGGER.info("Starting HMSFollower to HMS {}", metastoreURI); + LOGGER.info("Starting HmsFollower to HMS {}", metastoreURI); Preconditions.checkState(hmsFollower == null); Preconditions.checkState(hmsFollowerExecutor == null); @@ -295,8 +295,7 @@ public class SentryService implements Callable, SigUtils.SigListener { hiveConnectionFactory = new HiveSimpleConnectionFactory(conf, new HiveConf()); hiveConnectionFactory.init(); - hmsFollower = new HMSFollower(conf, sentryStore, leaderMonitor, hiveConnectionFactory); - + hmsFollower = new HmsFollower(conf, sentryStore, leaderMonitor, hiveConnectionFactory); long initDelay = conf.getLong(ServerConfig.SENTRY_HMSFOLLOWER_INIT_DELAY_MILLS, ServerConfig.SENTRY_HMSFOLLOWER_INIT_DELAY_MILLS_DEFAULT); long period = conf.getLong(ServerConfig.SENTRY_HMSFOLLOWER_INTERVAL_MILLS, @@ -309,7 +308,7 @@ public class SentryService implements Callable, SigUtils.SigListener { hmsFollowerExecutor.scheduleAtFixedRate(hmsFollower, initDelay, period, TimeUnit.MILLISECONDS); } catch (IllegalArgumentException e) { - LOGGER.error(String.format("Could not start HMSFollower due to illegal argument. period is %s ms", + LOGGER.error(String.format("Could not start HmsFollower due to illegal argument. period is %s ms", period), e); throw e; } @@ -350,8 +349,8 @@ public class SentryService implements Callable, SigUtils.SigListener { try { // close connections hmsFollower.close(); - } catch (RuntimeException ex) { - LOGGER.error("HMSFollower.close() failed", ex); + } catch (Exception ex) { + LOGGER.error("HmsFollower.close() failed", ex); } finally { hmsFollower = null; } http://git-wip-us.apache.org/repos/asf/sentry/blob/e5bb466e/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceUtil.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceUtil.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceUtil.java index 9c3e485..5826766 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceUtil.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceUtil.java @@ -30,9 +30,12 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.sentry.core.common.exception.SentryInvalidInputException; import org.apache.sentry.core.common.utils.SentryConstants; import org.apache.sentry.core.common.utils.KeyValue; import org.apache.sentry.core.common.utils.PolicyFileConstants; +import org.apache.sentry.provider.db.service.persistent.SentryStore; +import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable; import org.apache.sentry.provider.db.service.thrift.TSentryGrantOption; import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege; import org.apache.sentry.service.thrift.ServiceConstants.PrivilegeScope; @@ -219,8 +222,38 @@ public final class SentryServiceUtil { return hiveConf.get(METASTOREURIS.varname); } + /** + * Derives object name from database and table names by concatenating them + * + * @param authorizable for which is name is to be derived + * @return authorizable name + * @throws SentryInvalidInputException if argument provided does not have all the + * required fields set. + */ + public static String getAuthzObj(TSentryAuthorizable authorizable) + throws SentryInvalidInputException { + return getAuthzObj(authorizable.getDb(), authorizable.getTable()); + } + + /** + * Derives object name from database and table names by concatenating them + * + * @param dbName + * @param tblName + * @return authorizable name + * @throws SentryInvalidInputException if argument provided does not have all the + * required fields set. + */ + public static String getAuthzObj(String dbName, String tblName) + throws SentryInvalidInputException { + if (SentryStore.isNULL(dbName)) { + throw new SentryInvalidInputException("Invalif input, DB name is missing"); + } + return SentryStore.isNULL(tblName) ? dbName.toLowerCase() : + (dbName + "." + tblName).toLowerCase(); + } + private SentryServiceUtil() { // Make constructor private to avoid instantiation } - } http://git-wip-us.apache.org/repos/asf/sentry/blob/e5bb466e/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java index 6e22875..cbbd3ad 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java @@ -165,7 +165,7 @@ public class ServiceConstants { .put("javax.jdo.option.Multithreaded", "true") .build(); - // InitialDelay and period time for HMSFollower thread. + // InitialDelay and period time for HmsFollower thread. public static final String SENTRY_HMSFOLLOWER_INIT_DELAY_MILLS = "sentry.hmsfollower.init.delay.mills"; public static final long SENTRY_HMSFOLLOWER_INIT_DELAY_MILLS_DEFAULT = 0; public static final String SENTRY_HMSFOLLOWER_INTERVAL_MILLS = "sentry.hmsfollower.interval.mills"; http://git-wip-us.apache.org/repos/asf/sentry/blob/e5bb466e/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java index 51f6c5d..a8ebf7c 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java @@ -2697,7 +2697,6 @@ public class TestSentryStore extends org.junit.Assert { @Test public void testRenameUpdateAfterReplacingANewPathsImage() throws Exception { Map<String, Set<String>> authzPaths = new HashMap<>(); - // First image to persist (this will be replaced later) authzPaths.put("db1.table1", Sets.newHashSet("/user/hive/warehouse/db2.db/table1.1", "/user/hive/warehouse/db2.db/table1.2")); http://git-wip-us.apache.org/repos/asf/sentry/blob/e5bb466e/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java index 66ad2a1..2095469 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java @@ -16,75 +16,115 @@ */ package org.apache.sentry.service.thrift; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hive.hcatalog.messaging.HCatEventMessage; import org.apache.hive.hcatalog.messaging.HCatEventMessage.EventType; import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageFactory; import org.apache.sentry.hdfs.Updateable; import org.apache.sentry.provider.db.service.persistent.SentryStore; import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable; - -import java.util.Arrays; - -import org.junit.Test; +import org.junit.BeforeClass; import org.junit.Ignore; +import org.junit.Test; import org.mockito.Mockito; -import java.util.ArrayList; -import java.util.List; +import javax.security.auth.login.LoginException; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.reset; +public class TestHmsFollower { -public class TestHMSFollower { - SentryJSONMessageFactory messageFactory = new SentryJSONMessageFactory(); - SentryStore sentryStore = Mockito.mock(SentryStore.class); - final static String hiveInstance = "server2"; + private final static String hiveInstance = "server2"; + private final static Configuration configuration = new Configuration(); + private final SentryJSONMessageFactory messageFactory = new SentryJSONMessageFactory(); + private final SentryStore sentryStore = Mockito.mock(SentryStore.class); + private static HiveSimpleConnectionFactory hiveConnectionFactory; + @BeforeClass + public static void setup() throws IOException, LoginException { + hiveConnectionFactory = new HiveSimpleConnectionFactory(configuration, new HiveConf()); + hiveConnectionFactory.init(); + configuration.set("sentry.hive.sync.create", "true"); + } + + /** + * Constructs create database event and makes sure that appropriate sentry store API's + * are invoke when the event is processed by hms follower. + * + * @throws Exception + */ @Test public void testCreateDatabase() throws Exception { String dbName = "db1"; // Create notification events - NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.CREATE_DATABASE.toString(), - messageFactory.buildCreateDatabaseMessage(new Database(dbName, null, "hdfs:///db1", null)).toString()); + NotificationEvent notificationEvent = new NotificationEvent(1, 0, + HCatEventMessage.EventType.CREATE_DATABASE.toString(), + messageFactory.buildCreateDatabaseMessage(new Database(dbName, null, "hdfs:///db1", null)) + .toString()); List<NotificationEvent> events = new ArrayList<>(); events.add(notificationEvent); - - Configuration configuration = new Configuration(); - HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance); - hmsFollower.processNotificationEvents(events); + HmsFollower hmsFollower = new HmsFollower(configuration, sentryStore, null, + hiveConnectionFactory, hiveInstance); + hmsFollower.processNotifications(events); TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); authorizable.setServer(hiveInstance); authorizable.setDb("db1"); - verify(sentryStore, times(0)).dropPrivilege(authorizable, HMSFollower.onDropSentryPrivilege(authorizable)); + verify(sentryStore, times(1)) + .dropPrivilege(authorizable, NotificationProcessor.getPermUpdatableOnDrop(authorizable)); } + /** + * Constructs drop database event and makes sure that appropriate sentry store API's + * are invoke when the event is processed by hms follower. + * + * @throws Exception + */ @Test public void testDropDatabase() throws Exception { String dbName = "db1"; // Create notification events - NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.DROP_DATABASE.toString(), - messageFactory.buildDropDatabaseMessage(new Database(dbName, null, "hdfs:///db1", null)).toString()); + NotificationEvent notificationEvent = new NotificationEvent(1, 0, + HCatEventMessage.EventType.DROP_DATABASE.toString(), + messageFactory.buildDropDatabaseMessage(new Database(dbName, null, "hdfs:///db1", null)) + .toString()); List<NotificationEvent> events = new ArrayList<>(); events.add(notificationEvent); - Configuration configuration = new Configuration(); - HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance); - hmsFollower.processNotificationEvents(events); + HmsFollower hmsFollower = new HmsFollower(configuration, sentryStore, null, + hiveConnectionFactory, hiveInstance); + hmsFollower.processNotifications(events); TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); authorizable.setServer(hiveInstance); authorizable.setDb("db1"); - verify(sentryStore, times(1)).dropPrivilege(authorizable, HMSFollower.onDropSentryPrivilege(authorizable)); + verify(sentryStore, times(1)) + .dropPrivilege(authorizable, NotificationProcessor.getPermUpdatableOnDrop(authorizable)); } + /** + * Constructs create table event and makes sure that appropriate sentry store API's + * are invoke when the event is processed by hms follower. + * + * @throws Exception + */ @Test public void testCreateTable() throws Exception { String dbName = "db1"; @@ -93,23 +133,33 @@ public class TestHMSFollower { // Create notification events StorageDescriptor sd = new StorageDescriptor(); sd.setLocation("hdfs:///db1.db/table1"); - NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.CREATE_TABLE.toString(), - messageFactory.buildCreateTableMessage(new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString()); + NotificationEvent notificationEvent = new NotificationEvent(1, 0, + HCatEventMessage.EventType.CREATE_TABLE.toString(), + messageFactory.buildCreateTableMessage( + new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null)) + .toString()); List<NotificationEvent> events = new ArrayList<>(); events.add(notificationEvent); - Configuration configuration = new Configuration(); - HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance); - hmsFollower.processNotificationEvents(events); + HmsFollower hmsFollower = new HmsFollower(configuration, sentryStore, null, + hiveConnectionFactory, hiveInstance); + hmsFollower.processNotifications(events); TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); authorizable.setServer(hiveInstance); authorizable.setDb("db1"); authorizable.setTable(tableName); - verify(sentryStore, times(0)).dropPrivilege(authorizable, HMSFollower.onDropSentryPrivilege(authorizable)); + verify(sentryStore, times(1)) + .dropPrivilege(authorizable, NotificationProcessor.getPermUpdatableOnDrop(authorizable)); } + /** + * Constructs drop table event and makes sure that appropriate sentry store API's + * are invoke when the event is processed by hms follower. + * + * @throws Exception + */ @Test public void testDropTable() throws Exception { String dbName = "db1"; @@ -118,23 +168,33 @@ public class TestHMSFollower { // Create notification events StorageDescriptor sd = new StorageDescriptor(); sd.setLocation("hdfs:///db1.db/table1"); - NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.DROP_TABLE.toString(), - messageFactory.buildDropTableMessage(new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString()); + NotificationEvent notificationEvent = new NotificationEvent(1, 0, + HCatEventMessage.EventType.DROP_TABLE.toString(), + messageFactory.buildDropTableMessage( + new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null)) + .toString()); List<NotificationEvent> events = new ArrayList<>(); events.add(notificationEvent); - Configuration configuration = new Configuration(); - HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance); - hmsFollower.processNotificationEvents(events); + HmsFollower hmsFollower = new HmsFollower(configuration, sentryStore, null, + hiveConnectionFactory, hiveInstance); + hmsFollower.processNotifications(events); TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); authorizable.setServer(hiveInstance); authorizable.setDb("db1"); authorizable.setTable(tableName); - verify(sentryStore, times(1)).dropPrivilege(authorizable, HMSFollower.onDropSentryPrivilege(authorizable)); + verify(sentryStore, times(1)) + .dropPrivilege(authorizable, NotificationProcessor.getPermUpdatableOnDrop(authorizable)); } + /** + * Constructs rename table event and makes sure that appropriate sentry store API's + * are invoke when the event is processed by hms follower. + * + * @throws Exception + */ @Test public void testRenameTable() throws Exception { String dbName = "db1"; @@ -146,18 +206,20 @@ public class TestHMSFollower { // Create notification events StorageDescriptor sd = new StorageDescriptor(); sd.setLocation("hdfs:///db1.db/table1"); - NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.ALTER_TABLE.toString(), - messageFactory.buildAlterTableMessage( - new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null), - new Table(newTableName, newDbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString()); + NotificationEvent notificationEvent = new NotificationEvent(1, 0, + HCatEventMessage.EventType.ALTER_TABLE.toString(), + messageFactory.buildAlterTableMessage( + new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null), + new Table(newTableName, newDbName, null, 0, 0, 0, sd, null, null, null, null, null)) + .toString()); notificationEvent.setDbName(newDbName); notificationEvent.setTableName(newTableName); List<NotificationEvent> events = new ArrayList<>(); events.add(notificationEvent); - Configuration configuration = new Configuration(); - HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance); - hmsFollower.processNotificationEvents(events); + HmsFollower hmsFollower = new HmsFollower(configuration, sentryStore, null, + hiveConnectionFactory, hiveInstance); + hmsFollower.processNotifications(events); TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); authorizable.setServer(hiveInstance); @@ -169,11 +231,20 @@ public class TestHMSFollower { newAuthorizable.setDb(newDbName); newAuthorizable.setTable(newTableName); - verify(sentryStore, times(1)).renamePrivilege(authorizable, newAuthorizable, HMSFollower.onRenameSentryPrivilege(authorizable, newAuthorizable)); + verify(sentryStore, times(1)).renamePrivilege(authorizable, newAuthorizable, + NotificationProcessor.getPermUpdatableOnRename(authorizable, newAuthorizable)); } @Ignore + /** + * Constructs a bunch of events and passed to processor of hms follower. One of those is alter + * partition event with out actually changing anything(invalid event). Idea is to make sure that + * hms follower calls appropriate sentry store API's for the events processed by hms follower + * after processing the invalid alter partition event. + * + * @throws Exception + */ @Test public void testAlterPartitionWithInvalidEvent() throws Exception { String dbName = "db1"; @@ -181,35 +252,38 @@ public class TestHMSFollower { String tableName2 = "table2"; long inputEventId = 1; List<NotificationEvent> events = new ArrayList<>(); - NotificationEvent notificationEvent = null; + NotificationEvent notificationEvent; List<FieldSchema> partCols; - StorageDescriptor sd = null; + StorageDescriptor sd; Mockito.doNothing().when(sentryStore).persistLastProcessedNotificationID(Mockito.anyLong()); + //noinspection unchecked Mockito.doNothing().when(sentryStore).addAuthzPathsMapping(Mockito.anyString(), - Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); + Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); Configuration configuration = new Configuration(); - HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance); - + HmsFollower hmsFollower = new HmsFollower(configuration, sentryStore, null, + hiveConnectionFactory, hiveInstance); // Create a table sd = new StorageDescriptor(); sd.setLocation("hdfs://db1.db/table1"); - partCols = new ArrayList<FieldSchema>(); + partCols = new ArrayList<>(); partCols.add(new FieldSchema("ds", "string", "")); - Table table = new Table(tableName1, dbName, null, 0, 0, 0, sd, partCols, null, null, null, null); + Table table = new Table(tableName1, dbName, null, 0, 0, 0, sd, partCols, null, null, null, + null); notificationEvent = new NotificationEvent(inputEventId, 0, - HCatEventMessage.EventType.CREATE_TABLE.toString(), - messageFactory.buildCreateTableMessage(table).toString()); + HCatEventMessage.EventType.CREATE_TABLE.toString(), + messageFactory.buildCreateTableMessage(table).toString()); notificationEvent.setDbName(dbName); notificationEvent.setTableName(tableName1); events.add(notificationEvent); inputEventId += 1; // Process the notification - hmsFollower.processNotificationEvents(events); + hmsFollower.processNotifications(events); // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification // and persistLastProcessedNotificationID was not invoked. + //noinspection unchecked verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), - Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); + Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); reset(sentryStore); events.clear(); @@ -218,21 +292,22 @@ public class TestHMSFollower { List<Partition> partitions = new ArrayList<>(); StorageDescriptor invalidSd = new StorageDescriptor(); invalidSd.setLocation(null); - Partition partition = new Partition(Arrays.asList("today"), dbName, tableName1, - 0, 0, sd, null); + Partition partition = new Partition(Collections.singletonList("today"), dbName, tableName1, + 0, 0, sd, null); partitions.add(partition); notificationEvent = new NotificationEvent(inputEventId, 0, EventType.ADD_PARTITION.toString(), - messageFactory.buildAddPartitionMessage(table, partitions).toString()); + messageFactory.buildAddPartitionMessage(table, partitions).toString()); notificationEvent.setDbName(dbName); notificationEvent.setTableName(tableName1); events.add(notificationEvent); inputEventId += 1; //Process the notification - hmsFollower.processNotificationEvents(events); + hmsFollower.processNotifications(events); // Make sure that addAuthzPathsMapping was invoked once to handle ADD_PARTITION notification // and persistLastProcessedNotificationID was not invoked. + //noinspection unchecked verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), - Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); + Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); reset(sentryStore); events.clear(); @@ -241,13 +316,13 @@ public class TestHMSFollower { // This is an invalid event and should be processed by sentry store. // Event Id should be explicitly persisted using persistLastProcessedNotificationID notificationEvent = new NotificationEvent(inputEventId, 0, EventType.ALTER_PARTITION.toString(), - messageFactory.buildAlterPartitionMessage(partition, partition).toString()); + messageFactory.buildAlterPartitionMessage(partition, partition).toString()); notificationEvent.setDbName(dbName); notificationEvent.setTableName(tableName1); events.add(notificationEvent); inputEventId += 1; // Process the notification - hmsFollower.processNotificationEvents(events); + hmsFollower.processNotifications(events); // Make sure that persistLastProcessedNotificationID is invoked explicitly. verify(sentryStore, times(1)).persistLastProcessedNotificationID(inputEventId - 1); reset(sentryStore); @@ -255,21 +330,21 @@ public class TestHMSFollower { // Create a alter notification with some actual change. sd = new StorageDescriptor(); - sd.setLocation("hdfs://user/hive/wareshouse/db1.db/table1"); + sd.setLocation("hdfs://user/hive/warehouse/db1.db/table1"); Partition updatedPartition = new Partition(partition); updatedPartition.setSd(sd); notificationEvent = new NotificationEvent(inputEventId, 0, EventType.ALTER_PARTITION.toString(), - messageFactory.buildAlterPartitionMessage(partition, updatedPartition).toString()); + messageFactory.buildAlterPartitionMessage(partition, updatedPartition).toString()); notificationEvent.setDbName(dbName); notificationEvent.setTableName(tableName1); events.add(notificationEvent); inputEventId += 1; // Process the notification - hmsFollower.processNotificationEvents(events); + hmsFollower.processNotifications(events); // Make sure that updateAuthzPathsMapping was invoked once to handle ALTER_PARTITION // notification and persistLastProcessedNotificationID was not invoked. verify(sentryStore, times(1)).updateAuthzPathsMapping(Mockito.anyString(), - Mockito.anyString(), Mockito.anyString(), Mockito.any(Updateable.Update.class)); + Mockito.anyString(), Mockito.anyString(), Mockito.any(Updateable.Update.class)); verify(sentryStore, times(0)).persistLastProcessedNotificationID(inputEventId - 1); reset(sentryStore); events.clear(); @@ -277,24 +352,34 @@ public class TestHMSFollower { // Create a table sd = new StorageDescriptor(); sd.setLocation("hdfs://db1.db/table2"); - partCols = new ArrayList<FieldSchema>(); + partCols = new ArrayList<>(); partCols.add(new FieldSchema("ds", "string", "")); - Table table1 = new Table(tableName2, dbName, null, 0, 0, 0, sd, partCols, null, null, null, null); + Table table1 = new Table(tableName2, dbName, null, 0, 0, 0, sd, partCols, null, null, null, + null); notificationEvent = new NotificationEvent(inputEventId, 0, - HCatEventMessage.EventType.CREATE_TABLE.toString(), - messageFactory.buildCreateTableMessage(table1).toString()); + HCatEventMessage.EventType.CREATE_TABLE.toString(), + messageFactory.buildCreateTableMessage(table1).toString()); notificationEvent.setDbName(dbName); notificationEvent.setTableName(tableName2); events.add(notificationEvent); // Process the notification - hmsFollower.processNotificationEvents(events); + hmsFollower.processNotifications(events); // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification // and persistLastProcessedNotificationID was not invoked. + //noinspection unchecked verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), - Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); + Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); } + /** + * Constructs a bunch of events and passed to processor of hms follower. One of those is alter + * table event with out actually changing anything(invalid event). Idea is to make sure that + * hms follower calls appropriate sentry store API's for the events processed by hms follower + * after processing the invalid alter table event. + * + * @throws Exception + */ @Test public void testAlterTableWithInvalidEvent() throws Exception { String dbName = "db1"; @@ -302,61 +387,67 @@ public class TestHMSFollower { String tableName2 = "table2"; long inputEventId = 1; List<NotificationEvent> events = new ArrayList<>(); - NotificationEvent notificationEvent = null; + NotificationEvent notificationEvent; List<FieldSchema> partCols; - StorageDescriptor sd = null; + StorageDescriptor sd; Mockito.doNothing().when(sentryStore).persistLastProcessedNotificationID(Mockito.anyLong()); + //noinspection unchecked Mockito.doNothing().when(sentryStore).addAuthzPathsMapping(Mockito.anyString(), - Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); + Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); Configuration configuration = new Configuration(); - HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance); + HmsFollower hmsFollower = new HmsFollower(configuration, sentryStore, null, + hiveConnectionFactory, hiveInstance); // Create a table sd = new StorageDescriptor(); sd.setLocation("hdfs://db1.db/table1"); - partCols = new ArrayList<FieldSchema>(); + partCols = new ArrayList<>(); partCols.add(new FieldSchema("ds", "string", "")); - Table table = new Table(tableName1, dbName, null, 0, 0, 0, sd, partCols, null, null, null, null); + Table table = new Table(tableName1, dbName, null, 0, 0, 0, sd, partCols, null, null, null, + null); notificationEvent = new NotificationEvent(inputEventId, 0, - HCatEventMessage.EventType.CREATE_TABLE.toString(), - messageFactory.buildCreateTableMessage(table).toString()); + HCatEventMessage.EventType.CREATE_TABLE.toString(), + messageFactory.buildCreateTableMessage(table).toString()); notificationEvent.setDbName(dbName); notificationEvent.setTableName(tableName1); events.add(notificationEvent); inputEventId += 1; // Process the notification - hmsFollower.processNotificationEvents(events); + hmsFollower.processNotifications(events); // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification // and persistLastProcessedNotificationID was not invoked. + //noinspection unchecked verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), - Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); + Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); reset(sentryStore); events.clear(); - - // Create alter table notification with actuall changing anything. + // Create alter table notification with out actually changing anything. // This notification should not be processed by sentry server // Notification should be persisted explicitly - notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.ALTER_TABLE.toString(), - messageFactory.buildAlterTableMessage( - new Table(tableName1, dbName, null, 0, 0, 0, sd, null, null, null, null, null), - new Table(tableName1, dbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString()); + notificationEvent = new NotificationEvent(1, 0, + HCatEventMessage.EventType.ALTER_TABLE.toString(), + messageFactory.buildAlterTableMessage( + new Table(tableName1, dbName, null, 0, 0, 0, sd, null, null, null, null, null), + new Table(tableName1, dbName, null, 0, 0, 0, sd, null, null, null, null, null)) + .toString()); notificationEvent.setDbName(dbName); notificationEvent.setTableName(tableName1); events = new ArrayList<>(); events.add(notificationEvent); inputEventId += 1; // Process the notification - hmsFollower.processNotificationEvents(events); + hmsFollower.processNotifications(events); // Make sure that renameAuthzObj and deleteAuthzPathsMapping were not invoked // to handle CREATE_TABLE notification // and persistLastProcessedNotificationID is explicitly invoked verify(sentryStore, times(0)).renameAuthzObj(Mockito.anyString(), Mockito.anyString(), - Mockito.any(Updateable.Update.class)); + Mockito.any(Updateable.Update.class)); + //noinspection unchecked verify(sentryStore, times(0)).deleteAuthzPathsMapping(Mockito.anyString(), - Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); + Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); verify(sentryStore, times(1)).persistLastProcessedNotificationID(Mockito.anyLong()); reset(sentryStore); events.clear(); @@ -364,21 +455,78 @@ public class TestHMSFollower { // Create a table sd = new StorageDescriptor(); sd.setLocation("hdfs://db1.db/table2"); - partCols = new ArrayList<FieldSchema>(); + partCols = new ArrayList<>(); partCols.add(new FieldSchema("ds", "string", "")); - Table table1 = new Table(tableName2, dbName, null, 0, 0, 0, sd, partCols, null, null, null, null); + Table table1 = new Table(tableName2, dbName, null, 0, 0, 0, sd, partCols, null, null, null, + null); notificationEvent = new NotificationEvent(inputEventId, 0, - HCatEventMessage.EventType.CREATE_TABLE.toString(), - messageFactory.buildCreateTableMessage(table1).toString()); + HCatEventMessage.EventType.CREATE_TABLE.toString(), + messageFactory.buildCreateTableMessage(table1).toString()); notificationEvent.setDbName(dbName); notificationEvent.setTableName(tableName2); events.add(notificationEvent); // Process the notification - hmsFollower.processNotificationEvents(events); + hmsFollower.processNotifications(events); // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification // and persistLastProcessedNotificationID was not invoked. + //noinspection unchecked verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), - Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); + Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); } + + /** + * Constructs a two events and passed to processor of hms follower. First one being create table + * event with location information(Invalid Event). Idea is to make sure that hms follower calls + * appropriate sentry store API's for the event processed by hms follower after processing the + * invalid create table event. + * + * @throws Exception + */ + public void testCreateTableAfterInvalidEvent() throws Exception { + String dbName = "db1"; + String tableName = "table1"; + long inputEventId = 1; + + Mockito.doNothing().when(sentryStore).persistLastProcessedNotificationID(Mockito.anyLong()); + //noinspection unchecked + Mockito.doNothing().when(sentryStore) + .addAuthzPathsMapping(Mockito.anyString(), Mockito.anyCollection(), + Mockito.any(Updateable.Update.class)); + + // Create invalid notification event. The location of the storage descriptor is null, which is invalid for creating table + StorageDescriptor invalidSd = new StorageDescriptor(); + invalidSd.setLocation(null); + NotificationEvent invalidNotificationEvent = new NotificationEvent(inputEventId, 0, + HCatEventMessage.EventType.CREATE_TABLE.toString(), + messageFactory.buildCreateTableMessage( + new Table(tableName, dbName, null, 0, 0, 0, invalidSd, null, null, null, null, null)) + .toString()); + + // Create valid notification event + StorageDescriptor sd = new StorageDescriptor(); + sd.setLocation("hdfs://db1.db/table1"); + inputEventId += 1; + NotificationEvent notificationEvent = new NotificationEvent(inputEventId, 0, + HCatEventMessage.EventType.CREATE_TABLE.toString(), + messageFactory.buildCreateTableMessage( + new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null)) + .toString()); + List<NotificationEvent> events = new ArrayList<>(); + events.add(invalidNotificationEvent); + events.add(notificationEvent); + + Configuration configuration = new Configuration(); + HmsFollower hmsFollower = new HmsFollower(configuration, sentryStore, null, + hiveConnectionFactory, hiveInstance); + hmsFollower.processNotifications(events); + + // invalid event updates notification ID directly + verify(sentryStore, times(1)).persistLastProcessedNotificationID(inputEventId - 1); + + // next valid event update path, which updates notification ID + //noinspection unchecked + verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), Mockito.anyCollection(), + Mockito.any(Updateable.Update.class)); + } }