http://git-wip-us.apache.org/repos/asf/sentry/blob/07d3ec17/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java index a49d8c6..6762de7 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java @@ -15,275 +15,82 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.sentry.service.thrift; -import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_CREATE_WITH_POLICY_STORE; -import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_DROP_WITH_POLICY_STORE; - -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hive.hcatalog.messaging.HCatEventMessage; -import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAddPartitionMessage; -import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterPartitionMessage; -import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterTableMessage; -import org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateDatabaseMessage; -import org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateTableMessage; -import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropDatabaseMessage; -import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropPartitionMessage; -import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropTableMessage; -import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageDeserializer; -import org.apache.sentry.core.common.exception.SentryInvalidHMSEventException; -import org.apache.sentry.core.common.exception.SentryInvalidInputException; -import org.apache.sentry.core.common.exception.SentryNoSuchObjectException; import org.apache.sentry.hdfs.PathsUpdate; -import org.apache.sentry.hdfs.PermissionsUpdate; import org.apache.sentry.hdfs.SentryMalformedPathException; -import org.apache.sentry.hdfs.Updateable.Update; -import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges; +import org.apache.sentry.core.common.exception.SentryInvalidHMSEventException; import org.apache.sentry.provider.db.service.persistent.SentryStore; -import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; /** * NotificationProcessor processes various notification events generated from - * the Hive MetaStore state change, and applies these changes to the complete + * the Hive MetaStore state change, and applies these changes on the complete * HMS Paths snapshot or delta update stored in Sentry using SentryStore. - * - * <p>NotificationProcessor should not skip processing notification events for any reason. + * <p> + * NotificationProcessor should not skip processing notification events for any reason. * If some notification events are to be skipped, appropriate logic should be added in - * HmsFollower before invoking NotificationProcessor. + * HMSFollower before invoking NotificationProcessor. */ -final class NotificationProcessor { +class NotificationProcessor { - private static final Logger LOGGER = LoggerFactory.getLogger(NotificationProcessor.class); + private final Logger LOGGER; private final SentryStore sentryStore; - private final SentryJSONMessageDeserializer deserializer; - private final String authServerName; - // These variables can be updated even after object is instantiated, for testing purposes. - private boolean syncStoreOnCreate = false; - private boolean syncStoreOnDrop = false; - /** - * Configuring notification processor. - * - * @param sentryStore sentry backend store - * @param authServerName Server that sentry is authorizing - * @param conf sentry configuration - */ - NotificationProcessor(SentryStore sentryStore, String authServerName, - Configuration conf) { + NotificationProcessor(SentryStore sentryStore, Logger LOGGER) { + this.LOGGER = LOGGER; this.sentryStore = sentryStore; - deserializer = new SentryJSONMessageDeserializer(); - this.authServerName = authServerName; - syncStoreOnCreate = Boolean - .parseBoolean(conf.get(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE.getVar(), - AUTHZ_SYNC_CREATE_WITH_POLICY_STORE.getDefault())); - syncStoreOnDrop = Boolean.parseBoolean(conf.get(AUTHZ_SYNC_DROP_WITH_POLICY_STORE.getVar(), - AUTHZ_SYNC_DROP_WITH_POLICY_STORE.getDefault())); - } - - /** - * Split path into components on the "/" character. - * The path should not start with "/". - * This is consumed by Thrift interface, so the return result should be - * {@code List<String>} - * - * @param path input oath e.g. {@code foo/bar} - * @return list of components, e.g. [foo, bar] - */ - private static List<String> splitPath(String path) { - return (Lists.newArrayList(path.split("/"))); - } - - /** - * Constructs permission update to be persisted for drop event that can be persisted - * from thrift object. - * - * @param authorizable thrift object that is dropped. - * @return update to be persisted - * @throws SentryInvalidInputException if the required fields are set in argument provided - */ - @VisibleForTesting - static Update getPermUpdatableOnDrop(TSentryAuthorizable authorizable) - throws SentryInvalidInputException { - PermissionsUpdate update = new PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false); - String authzObj = SentryServiceUtil.getAuthzObj(authorizable); - update.addPrivilegeUpdate(authzObj) - .putToDelPrivileges(PermissionsUpdate.ALL_ROLES, PermissionsUpdate.ALL_ROLES); - return update; - } - - /** - * Constructs permission update to be persisted for rename event that can be persisted from thrift - * object. - * - * @param oldAuthorizable old thrift object - * @param newAuthorizable new thrift object - * @return update to be persisted - * @throws SentryInvalidInputException if the required fields are set in arguments provided - */ - @VisibleForTesting - static Update getPermUpdatableOnRename(TSentryAuthorizable oldAuthorizable, - TSentryAuthorizable newAuthorizable) - throws SentryInvalidInputException { - String oldAuthz = SentryServiceUtil.getAuthzObj(oldAuthorizable); - String newAuthz = SentryServiceUtil.getAuthzObj(newAuthorizable); - PermissionsUpdate update = new PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false); - TPrivilegeChanges privUpdate = update.addPrivilegeUpdate(PermissionsUpdate.RENAME_PRIVS); - privUpdate.putToAddPrivileges(newAuthz, newAuthz); - privUpdate.putToDelPrivileges(oldAuthz, oldAuthz); - return update; - } - - /** - * This function is only used for testing purposes. - * - * @param value to be set - */ - @SuppressWarnings("SameParameterValue") - @VisibleForTesting - void setSyncStoreOnCreate(boolean value) { - syncStoreOnCreate = value; - } - - /** - * This function is only used for testing purposes. - * - * @param value to be set - */ - @SuppressWarnings("SameParameterValue") - @VisibleForTesting - void setSyncStoreOnDrop(boolean value) { - syncStoreOnDrop = value; - } - - /** - * Processes the event and persist to sentry store. - * - * @param event to be processed - * @return true, if the event is persisted to sentry store. false, if the event is not persisted. - * @throws Exception if there is an error processing the event. - */ - boolean processNotificationEvent(NotificationEvent event) throws Exception { - LOGGER - .debug("Processing event with id:{} and Type:{}", event.getEventId(), event.getEventType()); - switch (HCatEventMessage.EventType.valueOf(event.getEventType())) { - case CREATE_DATABASE: - return processCreateDatabase(event); - case DROP_DATABASE: - return processDropDatabase(event); - case CREATE_TABLE: - return processCreateTable(event); - case DROP_TABLE: - return processDropTable(event); - case ALTER_TABLE: - return processAlterTable(event); - case ADD_PARTITION: - return processAddPartition(event); - case DROP_PARTITION: - return processDropPartition(event); - case ALTER_PARTITION: - return processAlterPartition(event); - case INSERT: - return false; - default: - LOGGER.error("Notification with ID:{} has invalid event type: {}", event.getEventId(), - event.getEventType()); - return false; - } } /** * Processes "create database" notification event, and applies its corresponding * snapshot change as well as delta path update into Sentry DB. * - * @param event notification event to be processed. + * @param dbName database name + * @param location database location + * @param seqNum notification event ID * @throws Exception if encounters errors while persisting the path change */ - private boolean processCreateDatabase(NotificationEvent event) throws Exception { - SentryJSONCreateDatabaseMessage message = - deserializer.getCreateDatabaseMessage(event.getMessage()); - String dbName = message.getDB(); - String location = message.getLocation(); - if ((dbName == null) || (location == null)) { - LOGGER.error("Create database event " - + "has incomplete information. dbName: {} location: {}", - StringUtils.defaultIfBlank(dbName, "null"), - StringUtils.defaultIfBlank(location, "null")); - return false; - } + void processCreateDatabase(String dbName, String location, long seqNum) throws Exception { List<String> locations = Collections.singletonList(location); - addPaths(dbName, locations, event.getEventId()); - if (syncStoreOnCreate) { - dropSentryDbPrivileges(dbName, event); - } - return true; + addPaths(dbName, locations, seqNum); } /** * Processes "drop database" notification event, and applies its corresponding * snapshot change as well as delta path update into Sentry DB. * - * @param event notification event to be processed. + * @param dbName database name + * @param location database location + * @param seqNum notification event ID * @throws Exception if encounters errors while persisting the path change */ - private boolean processDropDatabase(NotificationEvent event) throws Exception { - SentryJSONDropDatabaseMessage dropDatabaseMessage = - deserializer.getDropDatabaseMessage(event.getMessage()); - String dbName = dropDatabaseMessage.getDB(); - String location = dropDatabaseMessage.getLocation(); - if (dbName == null) { - LOGGER.error("Drop database event has incomplete information: dbName = null"); - return false; - } - if (syncStoreOnDrop) { - dropSentryDbPrivileges(dbName, event); - } + void processDropDatabase(String dbName, String location, long seqNum) throws Exception { List<String> locations = Collections.singletonList(location); - removePaths(dbName, locations, event.getEventId()); - return true; + removePaths(dbName, locations, seqNum); } /** * Processes "create table" notification event, and applies its corresponding * snapshot change as well as delta path update into Sentry DB. * - * @param event notification event to be processed. + * @param dbName database name + * @param tableName table name + * @param location table location + * @param seqNum notification event ID * @throws Exception if encounters errors while persisting the path change */ - private boolean processCreateTable(NotificationEvent event) - throws Exception { - SentryJSONCreateTableMessage createTableMessage = deserializer - .getCreateTableMessage(event.getMessage()); - String dbName = createTableMessage.getDB(); - String tableName = createTableMessage.getTable(); - String location = createTableMessage.getLocation(); - if ((dbName == null) || (tableName == null) || (location == null)) { - LOGGER.error(String.format("Create table event " + "has incomplete information." - + " dbName = %s, tableName = %s, location = %s", - StringUtils.defaultIfBlank(dbName, "null"), - StringUtils.defaultIfBlank(tableName, "null"), - StringUtils.defaultIfBlank(location, "null"))); - return false; - } - if (syncStoreOnCreate) { - dropSentryTablePrivileges(dbName, tableName, event); - } - String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName); + void processCreateTable(String dbName, String tableName, String location, long seqNum) + throws Exception { + String authzObj = dbName + "." + tableName; List<String> locations = Collections.singletonList(location); - addPaths(authzObj, locations, event.getEventId()); - return true; + addPaths(authzObj, locations, seqNum); } /** @@ -291,185 +98,86 @@ final class NotificationProcessor { * the table as well. And applies its corresponding snapshot change as well * as delta path update into Sentry DB. * - * @param event notification event to be processed. + * @param dbName database name + * @param tableName table name + * @param seqNum notification event ID * @throws Exception if encounters errors while persisting the path change */ - private boolean processDropTable(NotificationEvent event) throws Exception { - SentryJSONDropTableMessage dropTableMessage = deserializer - .getDropTableMessage(event.getMessage()); - String dbName = dropTableMessage.getDB(); - String tableName = dropTableMessage.getTable(); - if ((dbName == null) || (tableName == null)) { - LOGGER.error("Drop table event " - + "has incomplete information. dbName: {}, tableName: {}", - StringUtils.defaultIfBlank(dbName, "null"), - StringUtils.defaultIfBlank(tableName, "null")); - return false; - } - if (syncStoreOnDrop) { - dropSentryTablePrivileges(dbName, tableName, event); - } - String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName); - removeAllPaths(authzObj, event.getEventId()); - return true; + void processDropTable(String dbName, String tableName, long seqNum) throws Exception { + String authzObj = dbName + "." + tableName; + removeAllPaths(authzObj, seqNum); } /** * Processes "alter table" notification event, and applies its corresponding * snapshot change as well as delta path update into Sentry DB. * - * @param event notification event to be processed. + * @param oldDbName old database name + * @param newDbName new database name + * @param oldTableName old table name + * @param newTableName new table name + * @param oldLocation old table location + * @param newLocation new table location + * @param seqNum notification event ID * @throws Exception if encounters errors while persisting the path change */ - private boolean processAlterTable(NotificationEvent event) throws Exception { - SentryJSONAlterTableMessage alterTableMessage = - deserializer.getAlterTableMessage(event.getMessage()); - String oldDbName = alterTableMessage.getDB(); - String oldTableName = alterTableMessage.getTable(); - String newDbName = event.getDbName(); - String newTableName = event.getTableName(); - String oldLocation = alterTableMessage.getOldLocation(); - String newLocation = alterTableMessage.getNewLocation(); - - if ((oldDbName == null) - || (oldTableName == null) - || (newDbName == null) - || (newTableName == null) - || (oldLocation == null) - || (newLocation == null)) { - LOGGER.error(String.format("Alter table event " - + "has incomplete information. oldDbName = %s, oldTableName = %s, oldLocation = %s, " - + "newDbName = %s, newTableName = %s, newLocation = %s", - StringUtils.defaultIfBlank(oldDbName, "null"), - StringUtils.defaultIfBlank(oldTableName, "null"), - StringUtils.defaultIfBlank(oldLocation, "null"), - StringUtils.defaultIfBlank(newDbName, "null"), - StringUtils.defaultIfBlank(newTableName, "null"), - StringUtils.defaultIfBlank(newLocation, "null"))); - return false; - } - - if ((oldDbName.equals(newDbName)) - && (oldTableName.equals(newTableName)) - && (oldLocation.equals(newLocation))) { - LOGGER.error(String.format("Alter table notification ignored as neither name nor " - + "location has changed: oldAuthzObj = %s, oldLocation = %s, newAuthzObj = %s, " - + "newLocation = %s", oldDbName + "." + oldTableName, oldLocation, - newDbName + "." + newTableName, newLocation)); - return false; - } - - if (!newDbName.equalsIgnoreCase(oldDbName) || !oldTableName.equalsIgnoreCase(newTableName)) { - // Name has changed - try { - renamePrivileges(oldDbName, oldTableName, newDbName, newTableName); - } catch (SentryNoSuchObjectException e) { - LOGGER.info("Rename Sentry privilege ignored as there are no privileges on the table:" - + " {}.{}", oldDbName, oldTableName); - } catch (Exception e) { - LOGGER.info("Could not process Alter table event. Event: {}", event.toString(), e); - return false; - } - } + void processAlterTable(String oldDbName, String newDbName, String oldTableName, + String newTableName, String oldLocation, String newLocation, long seqNum) + throws Exception { String oldAuthzObj = oldDbName + "." + oldTableName; String newAuthzObj = newDbName + "." + newTableName; - renameAuthzPath(oldAuthzObj, newAuthzObj, oldLocation, newLocation, event.getEventId()); - return true; + renameAuthzPath(oldAuthzObj, newAuthzObj, oldLocation, newLocation, seqNum); } /** * Processes "add partition" notification event, and applies its corresponding * snapshot change as well as delta path update into Sentry DB. * - * @param event notification event to be processed. + * @param dbName database name + * @param tableName table name + * @param locations partition locations + * @param seqNum notification event ID * @throws Exception if encounters errors while persisting the path change */ - private boolean processAddPartition(NotificationEvent event) - throws Exception { - SentryJSONAddPartitionMessage addPartitionMessage = - deserializer.getAddPartitionMessage(event.getMessage()); - String dbName = addPartitionMessage.getDB(); - String tableName = addPartitionMessage.getTable(); - List<String> locations = addPartitionMessage.getLocations(); - if ((dbName == null) || (tableName == null) || (locations == null)) { - LOGGER.error(String.format("Create table event has incomplete information. " - + "dbName = %s, tableName = %s, locations = %s", - StringUtils.defaultIfBlank(dbName, "null"), - StringUtils.defaultIfBlank(tableName, "null"), - locations != null ? locations.toString() : "null")); - return false; - } - String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName); - addPaths(authzObj, locations, event.getEventId()); - return true; + void processAddPartition(String dbName, String tableName, + Collection<String> locations, long seqNum) + throws Exception { + String authzObj = dbName + "." + tableName; + addPaths(authzObj, locations, seqNum); } /** * Processes "drop partition" notification event, and applies its corresponding * snapshot change as well as delta path update into Sentry DB. * - * @param event notification event to be processed. + * @param dbName database name + * @param tableName table name + * @param locations partition locations + * @param seqNum notification event ID * @throws Exception if encounters errors while persisting the path change */ - private boolean processDropPartition(NotificationEvent event) - throws Exception { - SentryJSONDropPartitionMessage dropPartitionMessage = - deserializer.getDropPartitionMessage(event.getMessage()); - String dbName = dropPartitionMessage.getDB(); - String tableName = dropPartitionMessage.getTable(); - List<String> locations = dropPartitionMessage.getLocations(); - if ((dbName == null) || (tableName == null) || (locations == null)) { - LOGGER.error(String.format("Drop partition event " - + "has incomplete information. dbName = %s, tableName = %s, location = %s", - StringUtils.defaultIfBlank(dbName, "null"), - StringUtils.defaultIfBlank(tableName, "null"), - locations != null ? locations.toString() : "null")); - return false; - } - String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName); - removePaths(authzObj, locations, event.getEventId()); - return true; + void processDropPartition(String dbName, String tableName, + Collection<String> locations, long seqNum) + throws Exception { + String authzObj = dbName + "." + tableName; + removePaths(authzObj, locations, seqNum); } /** * Processes "alter partition" notification event, and applies its corresponding * snapshot change as well as delta path update into Sentry DB. * - * @param event notification event to be processed. + * @param dbName database name + * @param tableName table name + * @param oldLocation old partition location + * @param newLocation new partition location + * @param seqNum notification event ID * @throws Exception if encounters errors while persisting the path change */ - private boolean processAlterPartition(NotificationEvent event) throws Exception { - SentryJSONAlterPartitionMessage alterPartitionMessage = - deserializer.getAlterPartitionMessage(event.getMessage()); - String dbName = alterPartitionMessage.getDB(); - String tableName = alterPartitionMessage.getTable(); - String oldLocation = alterPartitionMessage.getOldLocation(); - String newLocation = alterPartitionMessage.getNewLocation(); - - if ((dbName == null) - || (tableName == null) - || (oldLocation == null) - || (newLocation == null)) { - LOGGER.error(String.format("Alter partition event " - + "has incomplete information. dbName = %s, tableName = %s, " - + "oldLocation = %s, newLocation = %s", - StringUtils.defaultIfBlank(dbName, "null"), - StringUtils.defaultIfBlank(tableName, "null"), - StringUtils.defaultIfBlank(oldLocation, "null"), - StringUtils.defaultIfBlank(newLocation, "null"))); - return false; - } - - if (oldLocation.equals(newLocation)) { - LOGGER.info(String.format("Alter partition notification ignored as" - + "location has not changed: AuthzObj = %s, Location = %s", dbName + "." - + "." + tableName, oldLocation)); - return false; - } - + void processAlterPartition(String dbName, String tableName, String oldLocation, + String newLocation, long seqNum) throws Exception { String oldAuthzObj = dbName + "." + tableName; - renameAuthzPath(oldAuthzObj, oldAuthzObj, oldLocation, newLocation, event.getEventId()); - return true; + renameAuthzPath(oldAuthzObj, oldAuthzObj, oldLocation, newLocation, seqNum); } /** @@ -479,9 +187,10 @@ final class NotificationProcessor { * @param authzObj the given authzObj * @param locations a set of paths need to be added * @param seqNum notification event ID + * @throws Exception */ private void addPaths(String authzObj, Collection<String> locations, long seqNum) - throws Exception { + throws Exception { // AuthzObj is case insensitive authzObj = authzObj.toLowerCase(); @@ -492,13 +201,13 @@ final class NotificationProcessor { for (String location : locations) { String pathTree = getPath(location); if (pathTree == null) { - LOGGER.debug("HMS Path Update [" + LOGGER.debug("#### HMS Path Update [" + "OP : addPath, " + "authzObj : " + authzObj + ", " + "path : " + location + "] - nothing to add" + ", " + "notification event ID: " + seqNum + "]"); } else { - LOGGER.debug("HMS Path Update [" + LOGGER.debug("#### HMS Path Update [" + "OP : addPath, " + "authzObj : " + authzObj + ", " + "path : " + location + ", " @@ -517,9 +226,10 @@ final class NotificationProcessor { * @param authzObj the given authzObj * @param locations a set of paths need to be removed * @param seqNum notification event ID + * @throws Exception */ private void removePaths(String authzObj, Collection<String> locations, long seqNum) - throws Exception { + throws Exception { // AuthzObj is case insensitive authzObj = authzObj.toLowerCase(); @@ -528,13 +238,13 @@ final class NotificationProcessor { for (String location : locations) { String pathTree = getPath(location); if (pathTree == null) { - LOGGER.debug("HMS Path Update [" + LOGGER.debug("#### HMS Path Update [" + "OP : removePath, " + "authzObj : " + authzObj + ", " + "path : " + location + "] - nothing to remove" + ", " + "notification event ID: " + seqNum + "]"); } else { - LOGGER.debug("HMS Path Update [" + LOGGER.debug("#### HMS Path Update [" + "OP : removePath, " + "authzObj : " + authzObj + ", " + "path : " + location + ", " @@ -553,13 +263,14 @@ final class NotificationProcessor { * * @param authzObj the given authzObj to be deleted * @param seqNum notification event ID + * @throws Exception */ private void removeAllPaths(String authzObj, long seqNum) - throws Exception { + throws Exception { // AuthzObj is case insensitive authzObj = authzObj.toLowerCase(); - LOGGER.debug("HMS Path Update [" + LOGGER.debug("#### HMS Path Update [" + "OP : removeAllPaths, " + "authzObj : " + authzObj + ", " + "notification event ID: " + seqNum + "]"); @@ -578,19 +289,21 @@ final class NotificationProcessor { * @param newAuthzObj the new name to be changed to * @param oldLocation a existing path of the given authzObj * @param newLocation a new path to be changed to + * @param seqNum + * @throws Exception */ private void renameAuthzPath(String oldAuthzObj, String newAuthzObj, String oldLocation, - String newLocation, long seqNum) throws Exception { + String newLocation, long seqNum) throws Exception { // AuthzObj is case insensitive oldAuthzObj = oldAuthzObj.toLowerCase(); newAuthzObj = newAuthzObj.toLowerCase(); String oldPathTree = getPath(oldLocation); String newPathTree = getPath(newLocation); - LOGGER.debug("HMS Path Update [" + LOGGER.debug("#### HMS Path Update [" + "OP : renameAuthzObject, " + "oldAuthzObj : " + oldAuthzObj + ", " - + "newAuthzObj : " + newAuthzObj + ", " + + "newAuthzObj : " + newAuthzObj + ", " + "oldLocation : " + oldLocation + ", " + "newLocation : " + newLocation + ", " + "notification event ID: " + seqNum + "]"); @@ -610,10 +323,20 @@ final class NotificationProcessor { // Both name and location has changed // - Alter table rename for managed table sentryStore.renameAuthzPathsMapping(oldAuthzObj, newAuthzObj, oldPathTree, - newPathTree, update); + newPathTree, update); } - } else { - updateAuthzPathsMapping(oldAuthzObj, oldPathTree, newAuthzObj, newPathTree, seqNum); + } else if (oldPathTree != null) { + PathsUpdate update = new PathsUpdate(seqNum, false); + update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree)); + sentryStore.deleteAuthzPathsMapping(oldAuthzObj, + Collections.singleton(oldPathTree), + update); + } else if (newPathTree != null) { + PathsUpdate update = new PathsUpdate(seqNum, false); + update.newPathChange(newAuthzObj).addToAddPaths(splitPath(newPathTree)); + sentryStore.addAuthzPathsMapping(newAuthzObj, + Collections.singleton(newPathTree), + update); } } else if (!oldLocation.equals(newLocation)) { // Only Location has changed, e.g. Alter table set location @@ -623,35 +346,27 @@ final class NotificationProcessor { update.newPathChange(oldAuthzObj).addToAddPaths(splitPath(newPathTree)); sentryStore.updateAuthzPathsMapping(oldAuthzObj, oldPathTree, newPathTree, update); - } else { - updateAuthzPathsMapping(oldAuthzObj, oldPathTree, newAuthzObj, newPathTree, seqNum); + } else if (oldPathTree != null) { + PathsUpdate update = new PathsUpdate(seqNum, false); + update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree)); + sentryStore.deleteAuthzPathsMapping(oldAuthzObj, + Collections.singleton(oldPathTree), + update); + } else if (newPathTree != null) { + PathsUpdate update = new PathsUpdate(seqNum, false); + update.newPathChange(oldAuthzObj).addToAddPaths(splitPath(newPathTree)); + sentryStore.addAuthzPathsMapping(oldAuthzObj, + Collections.singleton(newPathTree), + update); } } else { LOGGER.error("Update Notification for Auhorizable object {}, with no change, skipping", - oldAuthzObj); - throw new SentryInvalidHMSEventException("Update Notification for Authorizable object" - + "with no change"); + oldAuthzObj); + throw new SentryInvalidHMSEventException("Update Notification for Authorizable object" + + "with no change"); } } - private void updateAuthzPathsMapping(String oldAuthzObj, String oldPathTree, - String newAuthzObj, String newPathTree, long seqNum) throws Exception { - if (oldPathTree != null) { - PathsUpdate update = new PathsUpdate(seqNum, false); - update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree)); - sentryStore.deleteAuthzPathsMapping(oldAuthzObj, - Collections.singleton(oldPathTree), - update); - } else if (newPathTree != null) { - PathsUpdate update = new PathsUpdate(seqNum, false); - update.newPathChange(newAuthzObj).addToAddPaths(splitPath(newPathTree)); - sentryStore.addAuthzPathsMapping(newAuthzObj, - Collections.singleton(newPathTree), - update); - } - - } - /** * Get path tree from a given path. It return null if encounters * SentryMalformedPathException which indicates a malformed path. @@ -668,45 +383,15 @@ final class NotificationProcessor { return null; } - private void dropSentryDbPrivileges(String dbName, NotificationEvent event) { - try { - TSentryAuthorizable authorizable = new TSentryAuthorizable(authServerName); - authorizable.setDb(dbName); - sentryStore.dropPrivilege(authorizable, getPermUpdatableOnDrop(authorizable)); - } catch (SentryNoSuchObjectException e) { - LOGGER.info("Drop Sentry privilege ignored as there are no privileges on the database: {}", - dbName); - } catch (Exception e) { - LOGGER.error("Could not process Drop database event." + "Event: " + event.toString(), e); - } - } - - private void dropSentryTablePrivileges(String dbName, String tableName, - NotificationEvent event) { - try { - TSentryAuthorizable authorizable = new TSentryAuthorizable(authServerName); - authorizable.setDb(dbName); - authorizable.setTable(tableName); - sentryStore.dropPrivilege(authorizable, getPermUpdatableOnDrop(authorizable)); - } catch (SentryNoSuchObjectException e) { - LOGGER.info("Drop Sentry privilege ignored as there are no privileges on the table: {}.{}", - dbName, tableName); - } catch (Exception e) { - LOGGER.error("Could not process Drop table event. Event: " + event.toString(), e); - } - } - - private void renamePrivileges(String oldDbName, String oldTableName, String newDbName, - String newTableName) throws - Exception { - TSentryAuthorizable oldAuthorizable = new TSentryAuthorizable(authServerName); - oldAuthorizable.setDb(oldDbName); - oldAuthorizable.setTable(oldTableName); - TSentryAuthorizable newAuthorizable = new TSentryAuthorizable(authServerName); - newAuthorizable.setDb(newDbName); - newAuthorizable.setTable(newTableName); - Update update = - getPermUpdatableOnRename(oldAuthorizable, newAuthorizable); - sentryStore.renamePrivilege(oldAuthorizable, newAuthorizable, update); + /** + * Split path into components on the "/" character. + * The path should not start with "/". + * This is consumed by Thrift interface, so the return result should be + * {@code List<String>} + * @param path input oath e.g. {@code foo/bar} + * @return list of commponents, e.g. [foo, bar] + */ + private List<String> splitPath(String path) { + return (Lists.newArrayList(path.split("/"))); } }
http://git-wip-us.apache.org/repos/asf/sentry/blob/07d3ec17/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHmsClient.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHmsClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHmsClient.java deleted file mode 100644 index 29a85d7..0000000 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHmsClient.java +++ /dev/null @@ -1,244 +0,0 @@ -/* - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - <p> - http://www.apache.org/licenses/LICENSE-2.0 - <p> - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - */ - -package org.apache.sentry.service.thrift; - -import static com.codahale.metrics.MetricRegistry.name; - -import com.codahale.metrics.Counter; -import com.codahale.metrics.Timer; -import com.codahale.metrics.Timer.Context; - -import com.google.common.annotations.VisibleForTesting; - -import java.io.IOException; - -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; -import org.apache.sentry.provider.db.service.persistent.PathsImage; -import org.apache.sentry.provider.db.service.persistent.SentryStore; -import org.apache.sentry.provider.db.service.thrift.SentryMetrics; - -import org.apache.thrift.TException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Wrapper class for <Code>HiveMetaStoreClient</Code> - * - * <p>Abstracts communication with HMS and exposes APi's to connect/disconnect to HMS and to - * request HMS snapshots and also for new notifications. - */ -final class SentryHmsClient implements AutoCloseable { - - private static final Logger LOGGER = LoggerFactory.getLogger(SentryHmsClient.class); - private final Configuration conf; - private HiveMetaStoreClient client = null; - private HiveConnectionFactory hiveConnectionFactory; - - private static final String SNAPSHOT = "snapshot"; - /** Measures time to get full snapshot. */ - private final Timer updateTimer = SentryMetrics.getInstance() - .getTimer(name(FullUpdateInitializer.class, SNAPSHOT)); - /** Number of times update failed. */ - private final Counter failedSnapshotsCount = SentryMetrics.getInstance() - .getCounter(name(FullUpdateInitializer.class, "failed")); - - SentryHmsClient(Configuration conf, HiveConnectionFactory hiveConnectionFactory) { - this.conf = conf; - this.hiveConnectionFactory = hiveConnectionFactory; - } - - /** - * Used only for testing purposes. - *x - * @param client HiveMetaStoreClient to be initialized - */ - @VisibleForTesting - void setClient(HiveMetaStoreClient client) { - this.client = client; - } - - /** - * Used to know if the client is connected to HMS - * - * @return true if the client is connected to HMS false, if client is not connected. - */ - boolean isConnected() { - return client != null; - } - - /** - * Connects to HMS by creating HiveMetaStoreClient. - * - * @throws IOException if could not establish connection - * @throws InterruptedException if connection was interrupted - * @throws MetaException if other errors happened - */ - void connect() - throws IOException, InterruptedException, MetaException { - if (client != null) { - return; - } - client = hiveConnectionFactory.connect().getClient(); - } - - /** - * Disconnects the HMS client. - */ - public void disconnect() throws Exception { - try { - if (client != null) { - LOGGER.info("Closing the HMS client connection"); - client.close(); - } - } catch (Exception e) { - LOGGER.error("failed to close Hive Connection Factory", e); - } finally { - client = null; - } - } - - /** - * Closes the HMS client. - * - * <p>This is similar to disconnect. As this class implements AutoClosable, close should be - * implemented. - */ - public void close() throws Exception { - disconnect(); - } - - /** - * Creates HMS full snapshot. - * - * @return Full path snapshot and the last notification id on success - */ - PathsImage getFullSnapshot() { - try { - if (client == null) { - LOGGER.error("Client is not connected to HMS"); - return new PathsImage(Collections.<String, Set<String>>emptyMap(), - SentryStore.EMPTY_NOTIFICATION_ID, SentryStore.EMPTY_PATHS_SNAPSHOT_ID); - } - - CurrentNotificationEventId eventIdBefore = client.getCurrentNotificationEventId(); - Map<String, Set<String>> pathsFullSnapshot = fetchFullUpdate(); - if (pathsFullSnapshot.isEmpty()) { - return new PathsImage(pathsFullSnapshot, SentryStore.EMPTY_NOTIFICATION_ID, - SentryStore.EMPTY_PATHS_SNAPSHOT_ID); - } - - CurrentNotificationEventId eventIdAfter = client.getCurrentNotificationEventId(); - LOGGER.info("NotificationID, Before Snapshot: {}, After Snapshot {}", - eventIdBefore.getEventId(), eventIdAfter.getEventId()); - // To ensure point-in-time snapshot consistency, need to make sure - // there were no HMS updates while retrieving the snapshot. If there are updates, snapshot - // is discarded. New attempt will be made after 500 milliseconds when - // HmsFollower runs again. - if (!eventIdBefore.equals(eventIdAfter)) { - LOGGER.error("Snapshot discarded, updates to HMS data while shapshot is being taken." - + "ID Before: {}. ID After: {}", eventIdBefore.getEventId(), eventIdAfter.getEventId()); - return new PathsImage(Collections.<String, Set<String>>emptyMap(), - SentryStore.EMPTY_NOTIFICATION_ID, SentryStore.EMPTY_PATHS_SNAPSHOT_ID); - } - - LOGGER.info("Successfully fetched hive full snapshot, Current NotificationID: {}.", - eventIdAfter); - // As eventIDAfter is the last event that was processed, eventIDAfter is used to update - // lastProcessedNotificationID instead of getting it from persistent store. - return new PathsImage(pathsFullSnapshot, eventIdAfter.getEventId(), - SentryStore.EMPTY_PATHS_SNAPSHOT_ID); - } catch (TException failure) { - LOGGER.error("Failed to communicate to HMS"); - return new PathsImage(Collections.<String, Set<String>>emptyMap(), - SentryStore.EMPTY_NOTIFICATION_ID, SentryStore.EMPTY_PATHS_SNAPSHOT_ID); - } - } - - /** - * Retrieve a Hive full snapshot from HMS. - * - * @return HMS snapshot. Snapshot consists of a mapping from auth object name to the set of paths - * corresponding to that name. - */ - private Map<String, Set<String>> fetchFullUpdate() { - LOGGER.info("Request full HMS snapshot"); - try (FullUpdateInitializer updateInitializer = - new FullUpdateInitializer(hiveConnectionFactory, conf); - Context context = updateTimer.time()) { - Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot(); - LOGGER.info("Obtained full HMS snapshot"); - return pathsUpdate; - } catch (Exception ignored) { - failedSnapshotsCount.inc(); - LOGGER.error("Snapshot created failed ", ignored); - return Collections.emptyMap(); - } - } - - /** - * Returns all HMS notifications with ID greater than the specified one - * - * @param notificationId ID of the last notification that was processed. - * @return Collection of new events to be synced - */ - Collection<NotificationEvent> getNotifications(long notificationId) throws Exception { - if (client == null) { - LOGGER.error("Client is not connected to HMS"); - return Collections.emptyList(); - } - LOGGER.debug("Checking for notifications beyond {}", notificationId); - // HIVE-15761: Currently getNextNotification API may return an empty - // NotificationEventResponse causing TProtocolException. - // Workaround: Only processes the notification events newer than the last updated one. - CurrentNotificationEventId eventId = client.getCurrentNotificationEventId(); - LOGGER.debug("ID of Last HMS notifications is: {}", eventId.getEventId()); - if (eventId.getEventId() < notificationId) { - LOGGER.error("Last notification of HMS is smaller than what sentry processed, Something is" - + "wrong. Sentry will request a full Snapshot"); - // TODO Path Mapping info should be cleared so that HmsFollower would request for full - // snapshot in the subsequent run. - return Collections.emptyList(); - } - - if (eventId.getEventId() == notificationId) { - return Collections.emptyList(); - } - - NotificationEventResponse response = - client.getNextNotification(notificationId, Integer.MAX_VALUE, null); - if (response.isSetEvents()) { - LOGGER.debug("Last Id processed:{}. Received collection of notifications, Size:{}", - notificationId, response.getEvents().size()); - return response.getEvents(); - } - - return Collections.emptyList(); - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/07d3ec17/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java index adb2030..6014a79 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java @@ -98,7 +98,7 @@ public class SentryService implements Callable, SigUtils.SigListener { private final String keytab; private final ExecutorService serviceExecutor; private ScheduledExecutorService hmsFollowerExecutor = null; - private HmsFollower hmsFollower = null; + private HMSFollower hmsFollower = null; private Future serviceStatus; private TServer thriftServer; private Status status; @@ -108,7 +108,7 @@ public class SentryService implements Callable, SigUtils.SigListener { /* sentryStore provides the data access for sentry data. It is the singleton instance shared between various {@link SentryPolicyService}, i.e., {@link SentryPolicyStoreProcessor} and - {@link HmsFollower}. + {@link HMSFollower}. */ private final SentryStore sentryStore; private ScheduledExecutorService sentryStoreCleanService; @@ -283,11 +283,11 @@ public class SentryService implements Callable, SigUtils.SigListener { String metastoreURI = SentryServiceUtil.getHiveMetastoreURI(); if (metastoreURI == null) { - LOGGER.info("Metastore uri is not configured. Do not start HmsFollower"); + LOGGER.info("Metastore uri is not configured. Do not start HMSFollower"); return; } - LOGGER.info("Starting HmsFollower to HMS {}", metastoreURI); + LOGGER.info("Starting HMSFollower to HMS {}", metastoreURI); Preconditions.checkState(hmsFollower == null); Preconditions.checkState(hmsFollowerExecutor == null); @@ -295,7 +295,8 @@ public class SentryService implements Callable, SigUtils.SigListener { hiveConnectionFactory = new HiveSimpleConnectionFactory(conf, new HiveConf()); hiveConnectionFactory.init(); - hmsFollower = new HmsFollower(conf, sentryStore, leaderMonitor, hiveConnectionFactory); + hmsFollower = new HMSFollower(conf, sentryStore, leaderMonitor, hiveConnectionFactory); + long initDelay = conf.getLong(ServerConfig.SENTRY_HMSFOLLOWER_INIT_DELAY_MILLS, ServerConfig.SENTRY_HMSFOLLOWER_INIT_DELAY_MILLS_DEFAULT); long period = conf.getLong(ServerConfig.SENTRY_HMSFOLLOWER_INTERVAL_MILLS, @@ -308,7 +309,7 @@ public class SentryService implements Callable, SigUtils.SigListener { hmsFollowerExecutor.scheduleAtFixedRate(hmsFollower, initDelay, period, TimeUnit.MILLISECONDS); } catch (IllegalArgumentException e) { - LOGGER.error(String.format("Could not start HmsFollower due to illegal argument. period is %s ms", + LOGGER.error(String.format("Could not start HMSFollower due to illegal argument. period is %s ms", period), e); throw e; } @@ -349,8 +350,8 @@ public class SentryService implements Callable, SigUtils.SigListener { try { // close connections hmsFollower.close(); - } catch (Exception ex) { - LOGGER.error("HmsFollower.close() failed", ex); + } catch (RuntimeException ex) { + LOGGER.error("HMSFollower.close() failed", ex); } finally { hmsFollower = null; } http://git-wip-us.apache.org/repos/asf/sentry/blob/07d3ec17/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceUtil.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceUtil.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceUtil.java index 5826766..9c3e485 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceUtil.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceUtil.java @@ -30,12 +30,9 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.sentry.core.common.exception.SentryInvalidInputException; import org.apache.sentry.core.common.utils.SentryConstants; import org.apache.sentry.core.common.utils.KeyValue; import org.apache.sentry.core.common.utils.PolicyFileConstants; -import org.apache.sentry.provider.db.service.persistent.SentryStore; -import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable; import org.apache.sentry.provider.db.service.thrift.TSentryGrantOption; import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege; import org.apache.sentry.service.thrift.ServiceConstants.PrivilegeScope; @@ -222,38 +219,8 @@ public final class SentryServiceUtil { return hiveConf.get(METASTOREURIS.varname); } - /** - * Derives object name from database and table names by concatenating them - * - * @param authorizable for which is name is to be derived - * @return authorizable name - * @throws SentryInvalidInputException if argument provided does not have all the - * required fields set. - */ - public static String getAuthzObj(TSentryAuthorizable authorizable) - throws SentryInvalidInputException { - return getAuthzObj(authorizable.getDb(), authorizable.getTable()); - } - - /** - * Derives object name from database and table names by concatenating them - * - * @param dbName - * @param tblName - * @return authorizable name - * @throws SentryInvalidInputException if argument provided does not have all the - * required fields set. - */ - public static String getAuthzObj(String dbName, String tblName) - throws SentryInvalidInputException { - if (SentryStore.isNULL(dbName)) { - throw new SentryInvalidInputException("Invalif input, DB name is missing"); - } - return SentryStore.isNULL(tblName) ? dbName.toLowerCase() : - (dbName + "." + tblName).toLowerCase(); - } - private SentryServiceUtil() { // Make constructor private to avoid instantiation } + } http://git-wip-us.apache.org/repos/asf/sentry/blob/07d3ec17/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java index cbbd3ad..6e22875 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java @@ -165,7 +165,7 @@ public class ServiceConstants { .put("javax.jdo.option.Multithreaded", "true") .build(); - // InitialDelay and period time for HmsFollower thread. + // InitialDelay and period time for HMSFollower thread. public static final String SENTRY_HMSFOLLOWER_INIT_DELAY_MILLS = "sentry.hmsfollower.init.delay.mills"; public static final long SENTRY_HMSFOLLOWER_INIT_DELAY_MILLS_DEFAULT = 0; public static final String SENTRY_HMSFOLLOWER_INTERVAL_MILLS = "sentry.hmsfollower.interval.mills"; http://git-wip-us.apache.org/repos/asf/sentry/blob/07d3ec17/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java index a8ebf7c..51f6c5d 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java @@ -2697,6 +2697,7 @@ public class TestSentryStore extends org.junit.Assert { @Test public void testRenameUpdateAfterReplacingANewPathsImage() throws Exception { Map<String, Set<String>> authzPaths = new HashMap<>(); + // First image to persist (this will be replaced later) authzPaths.put("db1.table1", Sets.newHashSet("/user/hive/warehouse/db2.db/table1.1", "/user/hive/warehouse/db2.db/table1.2")); http://git-wip-us.apache.org/repos/asf/sentry/blob/07d3ec17/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java new file mode 100644 index 0000000..66ad2a1 --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.service.thrift; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hive.hcatalog.messaging.HCatEventMessage; +import org.apache.hive.hcatalog.messaging.HCatEventMessage.EventType; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageFactory; +import org.apache.sentry.hdfs.Updateable; +import org.apache.sentry.provider.db.service.persistent.SentryStore; +import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable; + +import java.util.Arrays; + +import org.junit.Test; +import org.junit.Ignore; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.List; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.reset; + +public class TestHMSFollower { + SentryJSONMessageFactory messageFactory = new SentryJSONMessageFactory(); + SentryStore sentryStore = Mockito.mock(SentryStore.class); + final static String hiveInstance = "server2"; + + @Test + public void testCreateDatabase() throws Exception { + String dbName = "db1"; + + // Create notification events + NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.CREATE_DATABASE.toString(), + messageFactory.buildCreateDatabaseMessage(new Database(dbName, null, "hdfs:///db1", null)).toString()); + List<NotificationEvent> events = new ArrayList<>(); + events.add(notificationEvent); + + Configuration configuration = new Configuration(); + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance); + hmsFollower.processNotificationEvents(events); + + TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + authorizable.setDb("db1"); + + verify(sentryStore, times(0)).dropPrivilege(authorizable, HMSFollower.onDropSentryPrivilege(authorizable)); + } + + @Test + public void testDropDatabase() throws Exception { + String dbName = "db1"; + + // Create notification events + NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.DROP_DATABASE.toString(), + messageFactory.buildDropDatabaseMessage(new Database(dbName, null, "hdfs:///db1", null)).toString()); + List<NotificationEvent> events = new ArrayList<>(); + events.add(notificationEvent); + + Configuration configuration = new Configuration(); + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance); + hmsFollower.processNotificationEvents(events); + + TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + authorizable.setDb("db1"); + + verify(sentryStore, times(1)).dropPrivilege(authorizable, HMSFollower.onDropSentryPrivilege(authorizable)); + } + + @Test + public void testCreateTable() throws Exception { + String dbName = "db1"; + String tableName = "table1"; + + // Create notification events + StorageDescriptor sd = new StorageDescriptor(); + sd.setLocation("hdfs:///db1.db/table1"); + NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.CREATE_TABLE.toString(), + messageFactory.buildCreateTableMessage(new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString()); + List<NotificationEvent> events = new ArrayList<>(); + events.add(notificationEvent); + + Configuration configuration = new Configuration(); + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance); + hmsFollower.processNotificationEvents(events); + + TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + authorizable.setDb("db1"); + authorizable.setTable(tableName); + + verify(sentryStore, times(0)).dropPrivilege(authorizable, HMSFollower.onDropSentryPrivilege(authorizable)); + } + + @Test + public void testDropTable() throws Exception { + String dbName = "db1"; + String tableName = "table1"; + + // Create notification events + StorageDescriptor sd = new StorageDescriptor(); + sd.setLocation("hdfs:///db1.db/table1"); + NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.DROP_TABLE.toString(), + messageFactory.buildDropTableMessage(new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString()); + List<NotificationEvent> events = new ArrayList<>(); + events.add(notificationEvent); + + Configuration configuration = new Configuration(); + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance); + hmsFollower.processNotificationEvents(events); + + TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + authorizable.setDb("db1"); + authorizable.setTable(tableName); + + verify(sentryStore, times(1)).dropPrivilege(authorizable, HMSFollower.onDropSentryPrivilege(authorizable)); + } + + @Test + public void testRenameTable() throws Exception { + String dbName = "db1"; + String tableName = "table1"; + + String newDbName = "db1"; + String newTableName = "table2"; + + // Create notification events + StorageDescriptor sd = new StorageDescriptor(); + sd.setLocation("hdfs:///db1.db/table1"); + NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.ALTER_TABLE.toString(), + messageFactory.buildAlterTableMessage( + new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null), + new Table(newTableName, newDbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString()); + notificationEvent.setDbName(newDbName); + notificationEvent.setTableName(newTableName); + List<NotificationEvent> events = new ArrayList<>(); + events.add(notificationEvent); + + Configuration configuration = new Configuration(); + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance); + hmsFollower.processNotificationEvents(events); + + TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + authorizable.setDb(dbName); + authorizable.setTable(tableName); + + TSentryAuthorizable newAuthorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + newAuthorizable.setDb(newDbName); + newAuthorizable.setTable(newTableName); + + verify(sentryStore, times(1)).renamePrivilege(authorizable, newAuthorizable, HMSFollower.onRenameSentryPrivilege(authorizable, newAuthorizable)); + } + + + @Ignore + @Test + public void testAlterPartitionWithInvalidEvent() throws Exception { + String dbName = "db1"; + String tableName1 = "table1"; + String tableName2 = "table2"; + long inputEventId = 1; + List<NotificationEvent> events = new ArrayList<>(); + NotificationEvent notificationEvent = null; + List<FieldSchema> partCols; + StorageDescriptor sd = null; + Mockito.doNothing().when(sentryStore).persistLastProcessedNotificationID(Mockito.anyLong()); + Mockito.doNothing().when(sentryStore).addAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); + + Configuration configuration = new Configuration(); + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance); + + // Create a table + sd = new StorageDescriptor(); + sd.setLocation("hdfs://db1.db/table1"); + partCols = new ArrayList<FieldSchema>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName1, dbName, null, 0, 0, 0, sd, partCols, null, null, null, null); + notificationEvent = new NotificationEvent(inputEventId, 0, + HCatEventMessage.EventType.CREATE_TABLE.toString(), + messageFactory.buildCreateTableMessage(table).toString()); + notificationEvent.setDbName(dbName); + notificationEvent.setTableName(tableName1); + events.add(notificationEvent); + inputEventId += 1; + // Process the notification + hmsFollower.processNotificationEvents(events); + // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification + // and persistLastProcessedNotificationID was not invoked. + verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); + reset(sentryStore); + events.clear(); + + // Create a partition + List<Partition> partitions = new ArrayList<>(); + StorageDescriptor invalidSd = new StorageDescriptor(); + invalidSd.setLocation(null); + Partition partition = new Partition(Arrays.asList("today"), dbName, tableName1, + 0, 0, sd, null); + partitions.add(partition); + notificationEvent = new NotificationEvent(inputEventId, 0, EventType.ADD_PARTITION.toString(), + messageFactory.buildAddPartitionMessage(table, partitions).toString()); + notificationEvent.setDbName(dbName); + notificationEvent.setTableName(tableName1); + events.add(notificationEvent); + inputEventId += 1; + //Process the notification + hmsFollower.processNotificationEvents(events); + // Make sure that addAuthzPathsMapping was invoked once to handle ADD_PARTITION notification + // and persistLastProcessedNotificationID was not invoked. + verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); + reset(sentryStore); + events.clear(); + + // Create a alter notification with out actually changing anything. + // This is an invalid event and should be processed by sentry store. + // Event Id should be explicitly persisted using persistLastProcessedNotificationID + notificationEvent = new NotificationEvent(inputEventId, 0, EventType.ALTER_PARTITION.toString(), + messageFactory.buildAlterPartitionMessage(partition, partition).toString()); + notificationEvent.setDbName(dbName); + notificationEvent.setTableName(tableName1); + events.add(notificationEvent); + inputEventId += 1; + // Process the notification + hmsFollower.processNotificationEvents(events); + // Make sure that persistLastProcessedNotificationID is invoked explicitly. + verify(sentryStore, times(1)).persistLastProcessedNotificationID(inputEventId - 1); + reset(sentryStore); + events.clear(); + + // Create a alter notification with some actual change. + sd = new StorageDescriptor(); + sd.setLocation("hdfs://user/hive/wareshouse/db1.db/table1"); + Partition updatedPartition = new Partition(partition); + updatedPartition.setSd(sd); + notificationEvent = new NotificationEvent(inputEventId, 0, EventType.ALTER_PARTITION.toString(), + messageFactory.buildAlterPartitionMessage(partition, updatedPartition).toString()); + notificationEvent.setDbName(dbName); + notificationEvent.setTableName(tableName1); + events.add(notificationEvent); + inputEventId += 1; + // Process the notification + hmsFollower.processNotificationEvents(events); + // Make sure that updateAuthzPathsMapping was invoked once to handle ALTER_PARTITION + // notification and persistLastProcessedNotificationID was not invoked. + verify(sentryStore, times(1)).updateAuthzPathsMapping(Mockito.anyString(), + Mockito.anyString(), Mockito.anyString(), Mockito.any(Updateable.Update.class)); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(inputEventId - 1); + reset(sentryStore); + events.clear(); + + // Create a table + sd = new StorageDescriptor(); + sd.setLocation("hdfs://db1.db/table2"); + partCols = new ArrayList<FieldSchema>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table1 = new Table(tableName2, dbName, null, 0, 0, 0, sd, partCols, null, null, null, null); + notificationEvent = new NotificationEvent(inputEventId, 0, + HCatEventMessage.EventType.CREATE_TABLE.toString(), + messageFactory.buildCreateTableMessage(table1).toString()); + notificationEvent.setDbName(dbName); + notificationEvent.setTableName(tableName2); + events.add(notificationEvent); + // Process the notification + hmsFollower.processNotificationEvents(events); + // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification + // and persistLastProcessedNotificationID was not invoked. + verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); + } + + @Test + public void testAlterTableWithInvalidEvent() throws Exception { + String dbName = "db1"; + String tableName1 = "table1"; + String tableName2 = "table2"; + long inputEventId = 1; + List<NotificationEvent> events = new ArrayList<>(); + NotificationEvent notificationEvent = null; + List<FieldSchema> partCols; + StorageDescriptor sd = null; + Mockito.doNothing().when(sentryStore).persistLastProcessedNotificationID(Mockito.anyLong()); + Mockito.doNothing().when(sentryStore).addAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); + + Configuration configuration = new Configuration(); + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance); + + // Create a table + sd = new StorageDescriptor(); + sd.setLocation("hdfs://db1.db/table1"); + partCols = new ArrayList<FieldSchema>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName1, dbName, null, 0, 0, 0, sd, partCols, null, null, null, null); + notificationEvent = new NotificationEvent(inputEventId, 0, + HCatEventMessage.EventType.CREATE_TABLE.toString(), + messageFactory.buildCreateTableMessage(table).toString()); + notificationEvent.setDbName(dbName); + notificationEvent.setTableName(tableName1); + events.add(notificationEvent); + inputEventId += 1; + // Process the notification + hmsFollower.processNotificationEvents(events); + // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification + // and persistLastProcessedNotificationID was not invoked. + verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); + reset(sentryStore); + events.clear(); + + + // Create alter table notification with actuall changing anything. + // This notification should not be processed by sentry server + // Notification should be persisted explicitly + notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.ALTER_TABLE.toString(), + messageFactory.buildAlterTableMessage( + new Table(tableName1, dbName, null, 0, 0, 0, sd, null, null, null, null, null), + new Table(tableName1, dbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString()); + notificationEvent.setDbName(dbName); + notificationEvent.setTableName(tableName1); + events = new ArrayList<>(); + events.add(notificationEvent); + inputEventId += 1; + // Process the notification + hmsFollower.processNotificationEvents(events); + // Make sure that renameAuthzObj and deleteAuthzPathsMapping were not invoked + // to handle CREATE_TABLE notification + // and persistLastProcessedNotificationID is explicitly invoked + verify(sentryStore, times(0)).renameAuthzObj(Mockito.anyString(), Mockito.anyString(), + Mockito.any(Updateable.Update.class)); + verify(sentryStore, times(0)).deleteAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); + verify(sentryStore, times(1)).persistLastProcessedNotificationID(Mockito.anyLong()); + reset(sentryStore); + events.clear(); + + // Create a table + sd = new StorageDescriptor(); + sd.setLocation("hdfs://db1.db/table2"); + partCols = new ArrayList<FieldSchema>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table1 = new Table(tableName2, dbName, null, 0, 0, 0, sd, partCols, null, null, null, null); + notificationEvent = new NotificationEvent(inputEventId, 0, + HCatEventMessage.EventType.CREATE_TABLE.toString(), + messageFactory.buildCreateTableMessage(table1).toString()); + notificationEvent.setDbName(dbName); + notificationEvent.setTableName(tableName2); + events.add(notificationEvent); + // Process the notification + hmsFollower.processNotificationEvents(events); + // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification + // and persistLastProcessedNotificationID was not invoked. + verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(Updateable.Update.class)); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); + } +}