Repository: sentry Updated Branches: refs/heads/sentry-ha-redesign d3aeb958a -> a5e772b51
SENTRY-1587: Refactor SentryStore transaction to persist a single path transcation bundled with corresponding delta path change (Hao Hao, Reviewed by: Alexander Kolbasov and Na Li) Change-Id: I3d083b8d73c45551ecfdd06f26327aa57be8710d Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/a5e772b5 Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/a5e772b5 Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/a5e772b5 Branch: refs/heads/sentry-ha-redesign Commit: a5e772b516579ad65ce9b759389ed695dc88bffe Parents: d3aeb95 Author: hahao <[email protected]> Authored: Thu Apr 20 19:17:21 2017 -0700 Committer: hahao <[email protected]> Committed: Fri Apr 21 14:30:20 2017 -0700 ---------------------------------------------------------------------- .../sentry/hdfs/FullUpdateInitializer.java | 2 +- .../org/apache/sentry/hdfs/PathsUpdate.java | 2 +- .../db/service/model/MAuthzPathsMapping.java | 24 ++ .../db/service/persistent/SentryStore.java | 329 +++++++++++++++- .../sentry/service/thrift/HMSFollower.java | 241 +++++++----- .../service/thrift/NotificationProcessor.java | 377 +++++++++++++++++++ .../db/service/persistent/TestSentryStore.java | 195 +++++++--- .../e2e/hdfs/TestHDFSIntegrationAdvanced.java | 5 +- .../tests/e2e/hdfs/TestHDFSIntegrationBase.java | 5 +- .../e2e/hdfs/TestHDFSIntegrationEnd2End.java | 2 +- 10 files changed, 1016 insertions(+), 166 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/a5e772b5/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java index 90aaaef..d876d23 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java +++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java @@ -343,7 +343,7 @@ public final class FullUpdateInitializer implements AutoCloseable { List<List<String>> addPaths = pathChanges.getAddPaths(); Set<String> paths = new HashSet<>(addPaths.size()); for (List<String> addPath : addPaths) { - paths.add(PathsUpdate.cancatePath(addPath)); + paths.add(PathsUpdate.concatenatePath(addPath)); } authzObjToPath.put(pathChanges.getAuthzObj(), paths); } http://git-wip-us.apache.org/repos/asf/sentry/blob/a5e772b5/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java index 14e967a..e32d4a7 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java +++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java @@ -161,7 +161,7 @@ public class PathsUpdate implements Updateable.Update { * @param paths * @return a path string concatenated by "/". */ - public static String cancatePath(Iterable<String> paths) { + public static String concatenatePath(Iterable<String> paths) { return Joiner.on("/").join(paths); } http://git-wip-us.apache.org/repos/asf/sentry/blob/a5e772b5/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsMapping.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsMapping.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsMapping.java index c22364f..c710701 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsMapping.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsMapping.java @@ -66,6 +66,30 @@ public class MAuthzPathsMapping { return paths; } + public boolean removePath(MPath path) { + return paths.remove(path); + } + + public void addPath(MPath path) { + paths.add(path); + } + + /** + * Gets MPath object that has the given path value. + * TODO: Try to avoid loop lookup in future for performance improvement. + * + * @param path the given path name + * @return an Path object that has the given path value. + */ + public MPath getPath(String path) { + for (MPath mPath : paths) { + if (mPath.getPath().equals(path)) { + return mPath; + } + } + return null; + } + /* This method is used to get path set in string format constructed from Set<MPath>. http://git-wip-us.apache.org/repos/asf/sentry/blob/a5e772b5/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java index 08520f3..8b88c9a 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java @@ -2519,17 +2519,15 @@ public class SentryStore { }); } - public void createAuthzPathsMapping(final String hiveObj, - final Set<String> paths) throws Exception { - tm.executeTransactionWithRetry( - new TransactionBlock<Object>() { - public Object execute(PersistenceManager pm) throws Exception { - createAuthzPathsMappingCore(pm, hiveObj, paths); - return null; - } - }); - } - + /** + * Create an entry for the given authzObj and with a set of paths in + * the authzObj -> [Paths] mapping. + * + * @param authzObj an authzObj + * @param paths a set of paths need to be added into the authzObj -> [Paths] mapping + * @throws SentryAlreadyExistsException if this authzObj has already exist + * in the mapping. + */ private void createAuthzPathsMappingCore(PersistenceManager pm, String authzObj, Set<String> paths) throws SentryAlreadyExistsException { @@ -2545,16 +2543,297 @@ public class SentryStore { } /** + * Adds the authzObj and with a set of paths into the authzObj -> [Paths] mapping. + * As well as persist the corresponding delta path change to MSentryPathChange + * table in a single transaction. + * + * @param authzObj an authzObj + * @param paths a set of paths need to be added into the authzObj -> [Paths] mapping + * @param update the corresponding path delta update + * @throws Exception + */ + public void addAuthzPathsMapping(final String authzObj, final Set<String> paths, + final Update update) throws Exception { + execute(new DeltaTransactionBlock(update), new TransactionBlock<Object>() { + public Object execute(PersistenceManager pm) throws Exception { + addAuthzPathsMappingCore(pm, authzObj, paths); + return null; + } + }); + } + + /** + * Adds the authzObj and with a set of paths into the authzObj -> [Paths] mapping. + * If the given authzObj already exists in the mapping, only need to add the new paths + * into its mapping. + * + * @param pm PersistenceManager + * @param authzObj an authzObj + * @param paths a set of paths need to be added into the authzObj -> [Paths] mapping + */ + private void addAuthzPathsMappingCore(PersistenceManager pm, String authzObj, + Set<String> paths) { + MAuthzPathsMapping mAuthzPathsMapping = getMAuthzPathsMappingCore(pm, authzObj); + if (mAuthzPathsMapping == null) { + mAuthzPathsMapping = new MAuthzPathsMapping(authzObj, paths); + } else { + for (String path : paths) { + mAuthzPathsMapping.addPath(new MPath(path)); + } + } + pm.makePersistent(mAuthzPathsMapping); + } + + /** + * Deletes a set of paths belongs to given authzObj from the authzObj -> [Paths] mapping. + * As well as persist the corresponding delta path change to MSentryPathChange + * table in a single transaction. + * + * @param authzObj an authzObj + * @param paths a set of paths need to be deleted from the authzObj -> [Paths] mapping + * @param update the corresponding path delta update + */ + public void deleteAuthzPathsMapping(final String authzObj, final Set<String> paths, + final Update update) throws Exception { + execute(new DeltaTransactionBlock(update), new TransactionBlock<Object>() { + public Object execute(PersistenceManager pm) throws Exception { + deleteAuthzPathsMappingCore(pm, authzObj, paths); + return null; + } + }); + } + + /** + * Deletes a set of paths belongs to given authzObj from the authzObj -> [Paths] mapping. + * + * @param pm PersistenceManager + * @param authzObj an authzObj + * @param paths a set of paths need to be deleted from the authzObj -> [Paths] mapping. + * @throws SentryNoSuchObjectException if cannot find the existing authzObj or path. + */ + private void deleteAuthzPathsMappingCore(PersistenceManager pm, String authzObj, + Set<String> paths) { + MAuthzPathsMapping mAuthzPathsMapping = getMAuthzPathsMappingCore(pm, authzObj); + if (mAuthzPathsMapping != null) { + for (String path : paths) { + MPath mPath = mAuthzPathsMapping.getPath(path); + if (mPath == null) { + LOGGER.error("nonexistent path: " + path); + } else { + mAuthzPathsMapping.removePath(mPath); + pm.deletePersistent(mPath); + } + } + pm.makePersistent(mAuthzPathsMapping); + } else { + LOGGER.error("nonexistent authzObj: " + authzObj); + } + } + + /** + * Deletes all entries of the given authzObj from the authzObj -> [Paths] mapping. + * As well as persist the corresponding delta path change to MSentryPathChange + * table in a single transaction. + * + * @param authzObj an authzObj to be deleted + * @param update the corresponding path delta update + */ + public void deleteAllAuthzPathsMapping(final String authzObj, final Update update) + throws Exception { + execute(new DeltaTransactionBlock(update), new TransactionBlock<Object>() { + public Object execute(PersistenceManager pm) throws Exception { + deleteAllAuthzPathsMappingCore(pm, authzObj); + return null; + } + }); + } + + /** + * Deletes the entry of the given authzObj from the authzObj -> [Paths] mapping. + * + * @param pm PersistenceManager + * @param authzObj an authzObj to be deleted + * @throws SentryNoSuchObjectException if cannot find the existing authzObj + */ + private void deleteAllAuthzPathsMappingCore(PersistenceManager pm, String authzObj) { + MAuthzPathsMapping mAuthzPathsMapping = getMAuthzPathsMappingCore(pm, authzObj); + if (mAuthzPathsMapping != null) { + for (MPath mPath : mAuthzPathsMapping.getPaths()) { + mAuthzPathsMapping.removePath(mPath); + pm.deletePersistent(mPath); + } + pm.deletePersistent(mAuthzPathsMapping); + } else { + LOGGER.error("nonexistent authzObj: " + authzObj); + } + } + + /** + * Renames the existing authzObj to a new one in the authzObj -> [Paths] mapping. + * And updates its existing path with a new path, while keeps the rest of its paths + * untouched if there is any. As well as persist the corresponding delta path + * change to MSentryPathChange table in a single transaction. + * + * @param oldObj the existing authzObj + * @param newObj the new name to be changed to + * @param oldPath a existing path of the given authzObj + * @param newPath a new path to be changed to + * @param update the corresponding path delta update + */ + public void renameAuthzPathsMapping(final String oldObj, final String newObj, + final String oldPath, final String newPath, final Update update) throws Exception { + execute(new DeltaTransactionBlock(update), new TransactionBlock<Object>() { + public Object execute(PersistenceManager pm) throws Exception { + renameAuthzPathsMappingCore(pm, oldObj, newObj, oldPath, newPath); + return null; + } + }); + } + + /** + * Renames the existing authzObj to a new one in the authzObj -> [Paths] mapping. + * And updates its existing path with a new path, while keeps the rest of its paths + * untouched if there is any. + * + * @param pm PersistenceManager + * @param oldObj the existing authzObj + * @param newObj the new name to be changed to + * @param oldPath a existing path of the given authzObj + * @param newPath a new path to be changed to + * @throws SentryNoSuchObjectException if cannot find the existing authzObj or path. + */ + private void renameAuthzPathsMappingCore(PersistenceManager pm, String oldObj, + String newObj, String oldPath, String newPath) { + MAuthzPathsMapping mAuthzPathsMapping = getMAuthzPathsMappingCore(pm, oldObj); + if (mAuthzPathsMapping != null) { + MPath mOldPath = mAuthzPathsMapping.getPath(oldPath); + if (mOldPath == null) { + LOGGER.error("nonexistent path: " + oldPath); + } else { + mAuthzPathsMapping.removePath(mOldPath); + pm.deletePersistent(mOldPath); + } + mAuthzPathsMapping.addPath(new MPath(newPath)); + mAuthzPathsMapping.setAuthzObjName(newObj); + pm.makePersistent(mAuthzPathsMapping); + } else { + LOGGER.error("nonexistent authzObj: " + oldObj); + } + } + + /** + * Renames the existing authzObj to a new one in the authzObj -> [Paths] mapping, + * but keeps its paths mapping as-is. As well as persist the corresponding delta path + * change to MSentryPathChange table in a single transaction. + * + * @param oldObj the existing authzObj + * @param newObj the new name to be changed to + * @param update the corresponding path delta update + */ + public void renameAuthzObj(final String oldObj, final String newObj, + final Update update) throws Exception { + execute(new DeltaTransactionBlock(update), new TransactionBlock<Object>() { + public Object execute(PersistenceManager pm) throws Exception { + renameAuthzObjCore(pm, oldObj, newObj); + return null; + } + }); + } + + /** + * Renames the existing authzObj to a new one in the authzObj -> [Paths] mapping, + * but keeps its paths mapping as-is. + * + * @param pm PersistenceManager + * @param oldObj the existing authzObj + * @param newObj the new name to be changed to + * @throws SentryNoSuchObjectException if cannot find the existing authzObj. + */ + private void renameAuthzObjCore(PersistenceManager pm, String oldObj, + String newObj) { + MAuthzPathsMapping mAuthzPathsMapping = getMAuthzPathsMappingCore(pm, oldObj); + if (mAuthzPathsMapping != null) { + mAuthzPathsMapping.setAuthzObjName(newObj); + pm.makePersistent(mAuthzPathsMapping); + } else { + LOGGER.error("nonexistent authzObj: " + oldObj); + } + } + + /** + * Updates authzObj -> [Paths] mapping to replace an existing path with a new one + * given an authzObj. As well as persist the corresponding delta path change to + * MSentryPathChange table in a single transaction. + * + * @param authzObj an authzObj + * @param oldPath the existing path maps to the given authzObj + * @param newPath a new path to replace the existing one + * @param update the corresponding path delta update + * @throws Exception + */ + public void updateAuthzPathsMapping(final String authzObj, final String oldPath, + final String newPath, final Update update) throws Exception { + execute(new DeltaTransactionBlock(update), new TransactionBlock<Object>() { + public Object execute(PersistenceManager pm) throws Exception { + updateAuthzPathsMappingCore(pm, authzObj, oldPath, newPath); + return null; + } + }); + } + + /** + * Updates authzObj -> [Paths] mapping to replace an existing path with a new one + * given an authzObj. + * + * @param pm PersistenceManager + * @param authzObj an authzObj + * @param oldPath the existing path maps to the given authzObj + * @param newPath a non-empty path to replace the existing one + * @throws SentryNoSuchObjectException if no such path found + * in the authzObj -> [Paths] mapping. + */ + private void updateAuthzPathsMappingCore(PersistenceManager pm, String authzObj, + String oldPath, String newPath) { + + MAuthzPathsMapping mAuthzPathsMapping = getMAuthzPathsMappingCore(pm, authzObj); + if (mAuthzPathsMapping == null) { + mAuthzPathsMapping = new MAuthzPathsMapping(authzObj, Sets.newHashSet(newPath)); + } else { + MPath mOldPath = mAuthzPathsMapping.getPath(oldPath); + if (mOldPath == null) { + LOGGER.error("nonexistent path: " + oldPath); + } else { + mAuthzPathsMapping.removePath(mOldPath); + pm.deletePersistent(mOldPath); + } + MPath mNewPath = new MPath(newPath); + mAuthzPathsMapping.addPath(mNewPath); + } + pm.makePersistent(mAuthzPathsMapping); + } + + /** * Get the MAuthzPathsMapping object from authzObj */ - public MAuthzPathsMapping getMAuthzPathsMappingCore(PersistenceManager pm, String authzObj) { + private MAuthzPathsMapping getMAuthzPathsMappingCore(PersistenceManager pm, + String authzObj) { Query query = pm.newQuery(MAuthzPathsMapping.class); - query.setFilter("this.authzObjName == t"); - query.declareParameters("java.lang.String t"); + query.setFilter("this.authzObjName == authzObjName"); + query.declareParameters("java.lang.String authzObjName"); query.setUnique(true); return (MAuthzPathsMapping) query.execute(authzObj); } + @VisibleForTesting + List<MPath> getMPaths() throws Exception { + return tm.executeTransaction(new TransactionBlock<List<MPath>>() { + public List<MPath> execute(PersistenceManager pm) throws Exception { + Query query = pm.newQuery(MPath.class); + return (List<MPath>) query.execute(); + } + }); + } + /** * Method detects orphaned privileges * @@ -3193,6 +3472,24 @@ public class SentryStore { } /** + * Return exception for nonexistent path + * @param path path name + * @return SentryNoSuchObjectException with appropriate message + */ + private SentryNoSuchObjectException noSuchPath(final String path) { + return new SentryNoSuchObjectException("nonexistent path + " + path); + } + + /** + * Return exception for nonexistent authzObj + * @param authzObj an authzObj + * @return SentryNoSuchObjectException with appropriate message + */ + private SentryNoSuchObjectException noSuchAuthzObj(final String authzObj) { + return new SentryNoSuchObjectException("nonexistent authzObj + " + authzObj); + } + + /** * Fetch all {@link MSentryChange} in the database. * * @param cls the class of the Sentry delta change. @@ -3395,8 +3692,8 @@ public class SentryStore { /** * Execute Perm/Path UpdateTransaction and corresponding actual * action transaction, e.g dropSentryRole, in a single transaction. - * The order of the transaction does not matter because there is no - * any return value. + * Note that this method only applies to TransactionBlock that + * does not have any return value. * <p> * Failure in any TransactionBlock would cause the whole transaction * to fail. http://git-wip-us.apache.org/repos/asf/sentry/blob/a5e772b5/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java index 1543379..57b7f88 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java @@ -19,6 +19,7 @@ package org.apache.sentry.service.thrift; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; @@ -254,18 +255,23 @@ public class HMSFollower implements Runnable { sentryStore.persistFullPathsImage(pathsFullSnapshot); } - NotificationEventResponse response = client.getNextNotification(currentEventID, Integer.MAX_VALUE, null); - if (response.isSetEvents()) { - if (!response.getEvents().isEmpty()) { - if (currentEventID != lastLoggedEventId) { - // Only log when there are updates and the notification ID has changed. - LOGGER.debug(String.format("CurrentEventID = %s. Processing %s events", + // HIVE-15761: Currently getNextNotification API may return an empty + // NotificationEventResponse causing TProtocolException. + // Workaround: Only processes the notification events newer than the last updated one. + CurrentNotificationEventId eventId = client.getCurrentNotificationEventId(); + if (eventId.getEventId() > currentEventID) { + NotificationEventResponse response = client.getNextNotification(currentEventID, Integer.MAX_VALUE, null); + if (response.isSetEvents()) { + if (!response.getEvents().isEmpty()) { + if (currentEventID != lastLoggedEventId) { + // Only log when there are updates and the notification ID has changed. + LOGGER.debug(String.format("CurrentEventID = %s. Processing %s events", currentEventID, response.getEvents().size())); + lastLoggedEventId = currentEventID; + } - lastLoggedEventId = currentEventID; + processNotificationEvents(response.getEvents()); } - - processNotificationEvents(response.getEvents()); } } } catch (TException e) { @@ -281,7 +287,9 @@ public class HMSFollower implements Runnable { "while processing notification log", e); } catch (Throwable t) { // catching errors to prevent the executor to halt. - LOGGER.error("Caught unexpected exception in HMSFollower!", t.getCause()); + LOGGER.error("Caught unexpected exception in HMSFollower! Caused by: " + t.getMessage(), + t.getCause()); + t.printStackTrace(); } } @@ -350,51 +358,40 @@ public class HMSFollower implements Runnable { /** * Throws SentryInvalidHMSEventException if Notification event contains insufficient information */ - void processNotificationEvents(List<NotificationEvent> events) throws - SentryInvalidHMSEventException, SentryInvalidInputException { + void processNotificationEvents(List<NotificationEvent> events) throws Exception { SentryJSONMessageDeserializer deserializer = new SentryJSONMessageDeserializer(); final CounterWait counterWait = sentryStore.getCounterWait(); for (NotificationEvent event : events) { String dbName, tableName, oldLocation, newLocation, location; + List<String> locations; + NotificationProcessor notificationProcessor = new NotificationProcessor(sentryStore, LOGGER); switch (HCatEventMessage.EventType.valueOf(event.getEventType())) { case CREATE_DATABASE: SentryJSONCreateDatabaseMessage message = deserializer.getCreateDatabaseMessage(event.getMessage()); dbName = message.getDB(); - location = message.getLocation(); if (dbName == null || location == null) { - throw new SentryInvalidHMSEventException(String.format("Create database event has incomplete information. " + - "dbName = %s location = %s", dbName, location)); + throw new SentryInvalidHMSEventException(String.format("Create database event " + + "has incomplete information. dbName = %s location = %s", + StringUtils.defaultIfBlank(dbName, "null"), + StringUtils.defaultIfBlank(location, "null"))); } - if (syncWithPolicyStore(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE)) { - try { - dropSentryDbPrivileges(dbName); - } catch (SentryNoSuchObjectException e) { - LOGGER.info("Drop Sentry privilege ignored as there are no privileges on the database: %s", dbName); - } catch (Exception e) { - throw new SentryInvalidInputException("Could not process Create database event. Event: " + event.toString(), e); - } - } - //TODO: HDFSPlugin.addPath(dbName, location) + dropSentryDbPrivileges(dbName, event); + notificationProcessor.processCreateDatabase(dbName,location, event.getEventId()); break; case DROP_DATABASE: - SentryJSONDropDatabaseMessage dropDatabaseMessage = deserializer.getDropDatabaseMessage(event.getMessage()); + SentryJSONDropDatabaseMessage dropDatabaseMessage = + deserializer.getDropDatabaseMessage(event.getMessage()); dbName = dropDatabaseMessage.getDB(); + location = dropDatabaseMessage.getLocation(); if (dbName == null) { - throw new SentryInvalidHMSEventException(String.format("Drop database event has incomplete information. " + - "dbName = null")); - } - if (syncWithPolicyStore(AUTHZ_SYNC_DROP_WITH_POLICY_STORE)) { - try { - dropSentryDbPrivileges(dbName); - } catch (SentryNoSuchObjectException e) { - LOGGER.info("Drop Sentry privilege ignored as there are no privileges on the database: %s", dbName); - } catch (Exception e) { - throw new SentryInvalidInputException("Could not process Drop database event. Event: " + event.toString(), e); - } + throw new SentryInvalidHMSEventException(String.format("Drop database event " + + "has incomplete information. dbName = %s", + StringUtils.defaultIfBlank(dbName, "null"))); } - //TODO: HDFSPlugin.deletePath(dbName, location) + dropSentryDbPrivileges(dbName, event); + notificationProcessor.processDropDatabase(dbName, location, event.getEventId()); break; case CREATE_TABLE: SentryJSONCreateTableMessage createTableMessage = deserializer.getCreateTableMessage(event.getMessage()); @@ -402,38 +399,27 @@ public class HMSFollower implements Runnable { tableName = createTableMessage.getTable(); location = createTableMessage.getLocation(); if (dbName == null || tableName == null || location == null) { - throw new SentryInvalidHMSEventException(String.format("Create table event has incomplete information. " + - "dbName = %s, tableName = %s, location = %s", dbName, tableName, location)); + throw new SentryInvalidHMSEventException(String.format("Create table event " + + "has incomplete information. dbName = %s, tableName = %s, location = %s", + StringUtils.defaultIfBlank(dbName, "null"), + StringUtils.defaultIfBlank(tableName, "null"), + StringUtils.defaultIfBlank(location, "null"))); } - if (syncWithPolicyStore(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE)) { - try { - dropSentryTablePrivileges(dbName, tableName); - } catch (SentryNoSuchObjectException e) { - LOGGER.info("Drop Sentry privilege ignored as there are no privileges on the table: %s.%s", dbName, tableName); - } catch (Exception e) { - throw new SentryInvalidInputException("Could not process Create table event. Event: " + event.toString(), e); - } - } - //TODO: HDFSPlugin.deletePath(dbName, location) + dropSentryTablePrivileges(dbName, tableName, event); + notificationProcessor.processCreateTable(dbName, tableName, location, event.getEventId()); break; case DROP_TABLE: SentryJSONDropTableMessage dropTableMessage = deserializer.getDropTableMessage(event.getMessage()); dbName = dropTableMessage.getDB(); tableName = dropTableMessage.getTable(); if (dbName == null || tableName == null) { - throw new SentryInvalidHMSEventException(String.format("Drop table event has incomplete information. " + - "dbName = %s, tableName = %s", dbName, tableName)); - } - if (syncWithPolicyStore(AUTHZ_SYNC_DROP_WITH_POLICY_STORE)) { - try{ - dropSentryTablePrivileges(dbName, tableName); - } catch (SentryNoSuchObjectException e) { - LOGGER.info("Drop Sentry privilege ignored as there are no privileges on the table: %s.%s", dbName, tableName); - } catch (Exception e) { - throw new SentryInvalidInputException("Could not process Drop table event. Event: " + event.toString(), e); - } + throw new SentryInvalidHMSEventException(String.format("Drop table event " + + "has incomplete information. dbName = %s, tableName = %s", + StringUtils.defaultIfBlank(dbName, "null"), + StringUtils.defaultIfBlank(tableName, "null"))); } - //TODO: HDFSPlugin.deletePath(dbName, location) + dropSentryTablePrivileges(dbName, tableName, event); + notificationProcessor.processDropTable(dbName, tableName, event.getEventId()); break; case ALTER_TABLE: SentryJSONAlterTableMessage alterTableMessage = deserializer.getAlterTableMessage(event.getMessage()); @@ -447,46 +433,84 @@ public class HMSFollower implements Runnable { if (oldDbName == null || oldTableName == null || newDbName == null || newTableName == null || oldLocation == null || newLocation == null) { - throw new SentryInvalidHMSEventException(String.format("Alter table event has incomplete information. " + - "oldDbName = %s, oldTableName = %s, oldLocation = %s, newDbName = %s, newTableName = %s, newLocation = %s", - oldDbName, oldTableName, oldLocation, newDbName, newTableName, newLocation)); + throw new SentryInvalidHMSEventException(String.format("Alter table event " + + "has incomplete information. oldDbName = %s, oldTableName = %s, oldLocation = %s, " + + "newDbName = %s, newTableName = %s, newLocation = %s", + StringUtils.defaultIfBlank(oldDbName, "null"), + StringUtils.defaultIfBlank(oldTableName, "null"), + StringUtils.defaultIfBlank(oldLocation, "null"), + StringUtils.defaultIfBlank(newDbName, "null"), + StringUtils.defaultIfBlank(newTableName, "null"), + StringUtils.defaultIfBlank(newLocation, "null"))); } - if(!newDbName.equalsIgnoreCase(oldDbName) || !oldTableName.equalsIgnoreCase(newTableName)) { // Name has changed - if(!oldLocation.equals(newLocation)) { // Location has changed - - //Name and path has changed - // - Alter table rename for managed table - //TODO: Handle HDFS plugin - - } else { - //Only name has changed - // - Alter table rename for an external table - //TODO: Handle HDFS plugin - - } + if(!newDbName.equalsIgnoreCase(oldDbName) || !oldTableName.equalsIgnoreCase(newTableName)) { + // Name has changed try { renamePrivileges(oldDbName, oldTableName, newDbName, newTableName); } catch (SentryNoSuchObjectException e) { - LOGGER.info("Rename Sentry privilege ignored as there are no privileges on the table: %s.%s", oldDbName, oldTableName); + LOGGER.info("Rename Sentry privilege ignored as there are no privileges on the table: %s.%s", + oldDbName, oldTableName); } catch (Exception e) { throw new SentryInvalidInputException("Could not process Alter table event. Event: " + event.toString(), e); } - } else if(!oldLocation.equals(newLocation)) { // Only Location has changed{ - //- Alter table set location - //TODO: Handle HDFS plugin - } else { - LOGGER.info(String.format("Alter table notification ignored as neither name nor location has changed: " + - "oldDbName = %s, oldTableName = %s, oldLocation = %s, newDbName = %s, newTableName = %s, newLocation = %s", - oldDbName, oldTableName, oldLocation, newDbName, newTableName, newLocation)); } - //TODO: Write test cases for all these cases + notificationProcessor.processAlterTable(oldDbName, newDbName, oldTableName, + newTableName, oldLocation, newLocation, event.getEventId()); break; case ADD_PARTITION: + SentryJSONAddPartitionMessage addPartitionMessage = + deserializer.getAddPartitionMessage(event.getMessage()); + dbName = addPartitionMessage.getDB(); + tableName = addPartitionMessage.getTable(); + locations = addPartitionMessage.getLocations(); + if (dbName == null || tableName == null || locations == null) { + LOGGER.error(String.format("Create table event has incomplete information. " + + "dbName = %s, tableName = %s, locations = %s", + StringUtils.defaultIfBlank(dbName, "null"), + StringUtils.defaultIfBlank(tableName, "null"), + locations != null ? locations.toString() : "null")); + } + notificationProcessor.processAddPartition(dbName, tableName, locations, + event.getEventId()); + break; case DROP_PARTITION: - case ALTER_PARTITION: - //TODO: Handle HDFS plugin + SentryJSONDropPartitionMessage dropPartitionMessage = + deserializer.getDropPartitionMessage(event.getMessage()); + dbName = dropPartitionMessage.getDB(); + tableName = dropPartitionMessage.getTable(); + locations = dropPartitionMessage.getLocations(); + if (dbName == null || tableName == null || locations == null) { + throw new SentryInvalidHMSEventException(String.format("Drop partition event " + + "has incomplete information. dbName = %s, tableName = %s, location = %s", + StringUtils.defaultIfBlank(dbName, "null"), + StringUtils.defaultIfBlank(tableName, "null"), + locations != null ? locations.toString() : "null")); + } + notificationProcessor.processDropPartition(dbName, tableName, locations, + event.getEventId()); break; + case ALTER_PARTITION: + SentryJSONAlterPartitionMessage alterPartitionMessage = + deserializer.getAlterPartitionMessage(event.getMessage()); + dbName = alterPartitionMessage.getDB(); + tableName = alterPartitionMessage.getTable(); + oldLocation = alterPartitionMessage.getOldLocation(); + newLocation = alterPartitionMessage.getNewLocation(); + + if (dbName == null || tableName == null || oldLocation == null || newLocation == null) { + throw new SentryInvalidHMSEventException(String.format("Alter partition event " + + "has incomplete information. dbName = %s, tableName = %s, " + + "oldLocation = %s, newLocation = %s", + StringUtils.defaultIfBlank(dbName, "null"), + StringUtils.defaultIfBlank(tableName, "null"), + StringUtils.defaultIfBlank(oldLocation, "null"), + StringUtils.defaultIfBlank(newLocation, "null"))); + } + + notificationProcessor.processAlterPartition(dbName, tableName, oldLocation, + newLocation, event.getEventId()); + break; } currentEventID = event.getEventId(); // Wake up any HMS waiters that are waiting for this ID. @@ -498,17 +522,38 @@ public class HMSFollower implements Runnable { } } - private void dropSentryDbPrivileges(String dbName) throws Exception { - TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); - authorizable.setDb(dbName); - sentryStore.dropPrivilege(authorizable, onDropSentryPrivilege(authorizable)); + private void dropSentryDbPrivileges(String dbName, NotificationEvent event) throws Exception { + if (!syncWithPolicyStore(AUTHZ_SYNC_DROP_WITH_POLICY_STORE)) { + return; + } else { + try { + TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setDb(dbName); + sentryStore.dropPrivilege(authorizable, onDropSentryPrivilege(authorizable)); + } catch (SentryNoSuchObjectException e) { + LOGGER.info("Drop Sentry privilege ignored as there are no privileges on the database: %s", dbName); + } catch (Exception e) { + throw new SentryInvalidInputException("Could not process Drop database event." + + "Event: " + event.toString(), e); + } + } } - private void dropSentryTablePrivileges(String dbName, String tableName) throws Exception { - TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); - authorizable.setDb(dbName); - authorizable.setTable(tableName); - sentryStore.dropPrivilege(authorizable, onDropSentryPrivilege(authorizable)); + private void dropSentryTablePrivileges(String dbName, String tableName, NotificationEvent event) throws Exception { + if (!syncWithPolicyStore(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE)) { + return; + } else { + try { + TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setDb(dbName); + authorizable.setTable(tableName); + sentryStore.dropPrivilege(authorizable, onDropSentryPrivilege(authorizable)); + } catch (SentryNoSuchObjectException e) { + LOGGER.info("Drop Sentry privilege ignored as there are no privileges on the table: %s.%s", dbName, tableName); + } catch (Exception e) { + throw new SentryInvalidInputException("Could not process Create table event. Event: " + event.toString(), e); + } + } } private void renamePrivileges(String oldDbName, String oldTableName, String newDbName, String newTableName) throws http://git-wip-us.apache.org/repos/asf/sentry/blob/a5e772b5/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java new file mode 100644 index 0000000..84574f0 --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.service.thrift; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.sentry.hdfs.PathsUpdate; +import org.apache.sentry.hdfs.SentryMalformedPathException; +import org.apache.sentry.provider.db.service.persistent.SentryStore; +import org.slf4j.Logger; + +import java.util.*; + +/** + * NotificationProcessor processes various notification events generated from + * the Hive MetaStore state change, and applies these changes on the complete + * HMS Paths snapshot or delta update stored in Sentry using SentryStore. + */ +public class NotificationProcessor { + + private final Logger LOGGER; + private final SentryStore sentryStore; + + NotificationProcessor(SentryStore sentryStore, Logger LOGGER) { + this.LOGGER = LOGGER; + this.sentryStore = sentryStore; + } + + /** + * Processes "create database" notification event, and applies its corresponding + * snapshot change as well as delta path update into Sentry DB. + * + * @param dbName database name + * @param location database location + * @param seqNum notification event ID + * @throws Exception if encounters errors while persisting the path change + */ + void processCreateDatabase(String dbName, String location, long seqNum) throws Exception { + List<String> locations = Collections.singletonList(location); + addPaths(dbName, locations, seqNum); + } + + /** + * Processes "drop database" notification event, and applies its corresponding + * snapshot change as well as delta path update into Sentry DB. + * + * @param dbName database name + * @param location database location + * @param seqNum notification event ID + * @throws Exception if encounters errors while persisting the path change + */ + void processDropDatabase(String dbName, String location, long seqNum) throws Exception { + List<String> locations = Collections.singletonList(location); + removePaths(dbName, locations, seqNum); + } + + /** + * Processes "create table" notification event, and applies its corresponding + * snapshot change as well as delta path update into Sentry DB. + * + * @param dbName database name + * @param tableName table name + * @param location table location + * @param seqNum notification event ID + * @throws Exception if encounters errors while persisting the path change + */ + void processCreateTable(String dbName, String tableName, String location, long seqNum) + throws Exception { + String authzObj = dbName + "." + tableName; + List<String> locations = Collections.singletonList(location); + addPaths(authzObj, locations, seqNum); + } + + /** + * Processes "drop table" notification event. It drops all partitions belongs to + * the table as well. And applies its corresponding snapshot change as well + * as delta path update into Sentry DB. + * + * @param dbName database name + * @param tableName table name + * @param seqNum notification event ID + * @throws Exception if encounters errors while persisting the path change + */ + void processDropTable(String dbName, String tableName, long seqNum) throws Exception { + String authzObj = dbName + "." + tableName; + removeAllPaths(authzObj, seqNum); + } + + /** + * Processes "alter table" notification event, and applies its corresponding + * snapshot change as well as delta path update into Sentry DB. + * + * @param oldDbName old database name + * @param newDbName new database name + * @param oldTableName old table name + * @param newTableName new table name + * @param oldLocation old table location + * @param newLocation new table location + * @param seqNum notification event ID + * @throws Exception if encounters errors while persisting the path change + */ + void processAlterTable(String oldDbName, String newDbName, String oldTableName, + String newTableName, String oldLocation, String newLocation, long seqNum) + throws Exception { + String oldAuthzObj = oldDbName + "." + oldTableName; + String newAuthzObj = newDbName + "." + newTableName; + renameAuthzPath(oldAuthzObj, newAuthzObj, oldLocation, newLocation, seqNum); + } + + /** + * Processes "add partition" notification event, and applies its corresponding + * snapshot change as well as delta path update into Sentry DB. + * + * @param dbName database name + * @param tableName table name + * @param locations partition locations + * @param seqNum notification event ID + * @throws Exception if encounters errors while persisting the path change + */ + void processAddPartition(String dbName, String tableName, List<String> locations, long seqNum) + throws Exception { + String authzObj = dbName + "." + tableName; + addPaths(authzObj, locations, seqNum); + } + + /** + * Processes "drop partition" notification event, and applies its corresponding + * snapshot change as well as delta path update into Sentry DB. + * + * @param dbName database name + * @param tableName table name + * @param locations partition locations + * @param seqNum notification event ID + * @throws Exception if encounters errors while persisting the path change + */ + void processDropPartition(String dbName, String tableName, List<String> locations, long seqNum) + throws Exception { + String authzObj = dbName + "." + tableName; + removePaths(authzObj, locations, seqNum); + } + + /** + * Processes "alter partition" notification event, and applies its corresponding + * snapshot change as well as delta path update into Sentry DB. + * + * @param dbName database name + * @param tableName table name + * @param oldLocation old partition location + * @param newLocation new partition location + * @param seqNum notification event ID + * @throws Exception if encounters errors while persisting the path change + */ + void processAlterPartition(String dbName, String tableName, String oldLocation, + String newLocation, long seqNum) throws Exception { + String oldAuthzObj = dbName + "." + tableName; + renameAuthzPath(oldAuthzObj, oldAuthzObj, oldLocation, newLocation, seqNum); + } + + /** + * Adds an authzObj along with a set of paths into the authzObj -> [Paths] mapping + * as well as persist the corresponding delta path change to Sentry DB. + * + * @param authzObj the given authzObj + * @param locations a set of paths need to be added + * @param seqNum notification event ID + * @throws Exception + */ + private void addPaths(String authzObj, List<String> locations, long seqNum) + throws Exception { + // AuthzObj is case insensitive + authzObj = authzObj.toLowerCase(); + + PathsUpdate update = new PathsUpdate(seqNum, false); + Set<String> paths = new HashSet<>(); + // addPath and persist into Sentry DB. + // Skip update if encounter malformed path. + for (String location : locations) { + List<String> pathTree = getPath(location); + if (pathTree == null) { + LOGGER.debug("#### HMS Path Update [" + + "OP : addPath, " + + "authzObj : " + authzObj + ", " + + "path : " + location + "] - nothing to add" + ", " + + "notification event ID: " + seqNum + "]"); + } else { + LOGGER.debug("#### HMS Path Update [" + + "OP : addPath, " + "authzObj : " + + authzObj + ", " + + "path : " + location + ", " + + "notification event ID: " + seqNum + "]"); + update.newPathChange(authzObj).addToAddPaths(pathTree); + paths.add(PathsUpdate.concatenatePath(pathTree)); + } + } + sentryStore.addAuthzPathsMapping(authzObj, paths, update); + } + + /** + * Removes a set of paths map to a given authzObj from the authzObj -> [Paths] mapping + * as well as persist the corresponding delta path change to Sentry DB. + * + * @param authzObj the given authzObj + * @param locations a set of paths need to be removed + * @param seqNum notification event ID + * @throws Exception + */ + private void removePaths(String authzObj, List<String> locations, long seqNum) + throws Exception { + // AuthzObj is case insensitive + authzObj = authzObj.toLowerCase(); + + PathsUpdate update = new PathsUpdate(seqNum, false); + Set<String> paths = new HashSet<>(); + for (String location : locations) { + List<String> pathTree = getPath(location); + if (pathTree == null) { + LOGGER.debug("#### HMS Path Update [" + + "OP : removePath, " + + "authzObj : " + authzObj + ", " + + "path : " + location + "] - nothing to remove" + ", " + + "notification event ID: " + seqNum + "]"); + } else { + LOGGER.debug("#### HMS Path Update [" + + "OP : removePath, " + + "authzObj : " + authzObj + ", " + + "path : " + location + ", " + + "notification event ID: " + seqNum + "]"); + update.newPathChange(authzObj).addToDelPaths(pathTree); + paths.add(PathsUpdate.concatenatePath(pathTree)); + } + } + sentryStore.deleteAuthzPathsMapping(authzObj, paths, update); + } + + /** + * Removes a given authzObj and all paths belongs to it from the + * authzObj -> [Paths] mapping as well as persist the corresponding + * delta path change to Sentry DB. + * + * @param authzObj the given authzObj to be deleted + * @param seqNum notification event ID + * @throws Exception + */ + private void removeAllPaths(String authzObj, long seqNum) + throws Exception { + // AuthzObj is case insensitive + authzObj = authzObj.toLowerCase(); + + LOGGER.debug("#### HMS Path Update [" + + "OP : removeAllPaths, " + + "authzObj : " + authzObj + ", " + + "notification event ID: " + seqNum + "]"); + PathsUpdate update = new PathsUpdate(seqNum, false); + update.newPathChange(authzObj).addToDelPaths( + Lists.newArrayList(PathsUpdate.ALL_PATHS)); + sentryStore.deleteAllAuthzPathsMapping(authzObj, update); + } + + /** + * Renames a given authzObj and alter the paths belongs to it from the + * authzObj -> [Paths] mapping as well as persist the corresponding + * delta path change to Sentry DB. + * + * @param oldAuthzObj the existing authzObj + * @param newAuthzObj the new name to be changed to + * @param oldLocation a existing path of the given authzObj + * @param newLocation a new path to be changed to + * @param seqNum + * @throws Exception + */ + private void renameAuthzPath(String oldAuthzObj, String newAuthzObj, String oldLocation, + String newLocation, long seqNum) throws Exception { + // AuthzObj is case insensitive + oldAuthzObj = oldAuthzObj.toLowerCase(); + newAuthzObj = newAuthzObj.toLowerCase(); + List<String> oldPathTree = getPath(oldLocation); + List<String> newPathTree = getPath(newLocation); + + LOGGER.debug("#### HMS Path Update [" + + "OP : renameAuthzObject, " + + "oldAuthzObj : " + oldAuthzObj + ", " + + "newAuthzObj : " + newAuthzObj + ", " + + "oldLocation : " + oldLocation + ", " + + "newLocation : " + newLocation + ", " + + "notification event ID: " + seqNum + "]"); + + // In the case of HiveObj name has changed + if (!oldAuthzObj.equalsIgnoreCase(newAuthzObj)) { + // Skip update if encounter malformed path for both oldLocation and newLocation. + if (oldPathTree != null && newPathTree != null) { + PathsUpdate update = new PathsUpdate(seqNum, false); + update.newPathChange(oldAuthzObj).addToDelPaths(oldPathTree); + update.newPathChange(newAuthzObj).addToAddPaths(newPathTree); + if (!oldLocation.equals(newLocation)) { + // Both name and location has changed + // - Alter table rename for managed table + sentryStore.renameAuthzPathsMapping(oldAuthzObj, newAuthzObj, + PathsUpdate.concatenatePath(oldPathTree), + PathsUpdate.concatenatePath(newPathTree), + update); + } else { + //Only name has changed + // - Alter table rename for an external table + sentryStore.renameAuthzObj(oldAuthzObj, newAuthzObj, update); + } + } else if (oldPathTree != null) { + PathsUpdate update = new PathsUpdate(seqNum, false); + update.newPathChange(oldAuthzObj).addToDelPaths(oldPathTree); + sentryStore.deleteAuthzPathsMapping(oldAuthzObj, + Sets.newHashSet(PathsUpdate.concatenatePath(oldPathTree)), + update); + } else if (newPathTree != null) { + PathsUpdate update = new PathsUpdate(seqNum, false); + update.newPathChange(newAuthzObj).addToAddPaths(newPathTree); + sentryStore.addAuthzPathsMapping(newAuthzObj, + Sets.newHashSet(PathsUpdate.concatenatePath(newPathTree)), + update); + } + } else if (!oldLocation.equals(newLocation)) { + // Only Location has changed, e.g. Alter table set location + if (oldPathTree != null && newPathTree != null) { + PathsUpdate update = new PathsUpdate(seqNum, false); + update.newPathChange(oldAuthzObj).addToDelPaths(oldPathTree); + update.newPathChange(oldAuthzObj).addToAddPaths(newPathTree); + sentryStore.updateAuthzPathsMapping(oldAuthzObj, PathsUpdate.concatenatePath(oldPathTree), + PathsUpdate.concatenatePath(newPathTree), update); + } else if (oldPathTree != null) { + PathsUpdate update = new PathsUpdate(seqNum, false); + update.newPathChange(oldAuthzObj).addToDelPaths(oldPathTree); + sentryStore.deleteAuthzPathsMapping(oldAuthzObj, + Sets.newHashSet(PathsUpdate.concatenatePath(oldPathTree)), + update); + } else if (newPathTree != null) { + PathsUpdate update = new PathsUpdate(seqNum, false); + update.newPathChange(oldAuthzObj).addToAddPaths(newPathTree); + sentryStore.addAuthzPathsMapping(oldAuthzObj, + Sets.newHashSet(PathsUpdate.concatenatePath(newPathTree)), + update); + } + } else { + LOGGER.info(String.format("Alter table notification ignored as neither name nor " + + "location has changed: oldAuthzObj = %s, oldLocation = %s, newAuthzObj = %s, " + + "newLocation = %s", oldAuthzObj, oldLocation, newAuthzObj, newLocation)); + } + } + + /** + * Get path tree from a given path. It return null if encounters + * SentryMalformedPathException which indicates a malformed path. + * + * @param path a path + * @return the path tree given a path. + */ + private List<String> getPath(String path) { + try { + return PathsUpdate.parsePath(path); + } catch (SentryMalformedPathException e) { + LOGGER.error("Unexpected path while parsing, " + path, e.getMessage()); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/a5e772b5/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java index 87e3295..d12b0da 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Set; import java.util.List; +import com.google.common.collect.Lists; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.alias.CredentialProvider; @@ -42,11 +43,13 @@ import org.apache.sentry.core.model.db.AccessConstants; import org.apache.sentry.core.common.exception.SentryAlreadyExistsException; import org.apache.sentry.core.common.exception.SentryGrantDeniedException; import org.apache.sentry.core.common.exception.SentryNoSuchObjectException; +import org.apache.sentry.hdfs.PathsUpdate; import org.apache.sentry.hdfs.PermissionsUpdate; import org.apache.sentry.hdfs.Updateable; import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges; import org.apache.sentry.hdfs.service.thrift.TRoleChanges; import org.apache.sentry.provider.db.service.model.MSentryPermChange; +import org.apache.sentry.provider.db.service.model.MSentryPathChange; import org.apache.sentry.provider.db.service.model.MSentryPrivilege; import org.apache.sentry.provider.db.service.model.MSentryRole; import org.apache.sentry.provider.db.service.thrift.TSentryActiveRoleSet; @@ -2410,66 +2413,169 @@ public class TestSentryStore extends org.junit.Assert { assertEquals(2, roles.size()); } - /* - Makes sure that Authorizable object could be associated with multiple paths and can be properly persisted into - database. - */ + /** + * Verifies complete snapshot of HMS Paths can be persisted and retrieved properly. + */ @Test - public void testAuthzPathsMapping() throws Exception { - sentryStore.createAuthzPathsMapping("db1.table1", Sets.newHashSet("/user/hive/warehouse/db1.db/table1.1","/user/hive/warehouse/db1.db/table1.2")); - sentryStore.createAuthzPathsMapping("db1.table2", Sets.newHashSet("/user/hive/warehouse/db1.db/table2.1","/user/hive/warehouse/db1.db/table2.2")); - - PathsImage pathsImage = sentryStore.retrieveFullPathsImage(); - Map<String, Set<String>> pathImage = pathsImage.getPathImage(); - assertEquals(2, pathImage.size()); - for (Map.Entry<String, Set<String>> entry : pathImage.entrySet()) { - assertEquals(2, entry.getValue().size()); - } - assertEquals(Sets.newHashSet("/user/hive/warehouse/db1.db/table1.1","/user/hive/warehouse/db1.db/table1.2"), pathImage.get("db1.table1")); - assertEquals(Sets.newHashSet("/user/hive/warehouse/db1.db/table2.1","/user/hive/warehouse/db1.db/table2.2"), pathImage.get("db1.table2")); - + public void testPersistFullPathsImage() throws Exception { Map<String, Set<String>> authzPaths = new HashMap<>(); - authzPaths.put("db2.table1", Sets.newHashSet("/user/hive/warehouse/db2.db/table1.1","/user/hive/warehouse/db2.db/table1.2")); - authzPaths.put("db2.table2", Sets.newHashSet("/user/hive/warehouse/db2.db/table2.1","/user/hive/warehouse/db2.db/table2.2")); + // Makes sure that authorizable object could be associated + // with different paths and can be properly persisted into database. + authzPaths.put("db1.table1", Sets.newHashSet("/user/hive/warehouse/db2.db/table1.1", + "/user/hive/warehouse/db2.db/table1.2")); + // Makes sure that the same MPaths object could be associated + // with multiple authorizable and can be properly persisted into database. + authzPaths.put("db1.table2", Sets.newHashSet("/user/hive/warehouse/db2.db/table2.1", + "/user/hive/warehouse/db2.db/table2.2")); + authzPaths.put("db2.table2", Sets.newHashSet("/user/hive/warehouse/db2.db/table2.1", + "/user/hive/warehouse/db2.db/table2.3")); sentryStore.persistFullPathsImage(authzPaths); - pathsImage = sentryStore.retrieveFullPathsImage(); - pathImage = pathsImage.getPathImage(); - assertEquals(4, pathImage.size()); + PathsImage pathsImage = sentryStore.retrieveFullPathsImage(); + Map<String, Set<String>> pathImage = pathsImage.getPathImage(); + assertEquals(3, pathImage.size()); for (Map.Entry<String, Set<String>> entry : pathImage.entrySet()) { assertEquals(2, entry.getValue().size()); } - assertEquals(Sets.newHashSet("/user/hive/warehouse/db2.db/table1.1","/user/hive/warehouse/db2.db/table1.2"), pathImage.get("db2.table1")); - assertEquals(Sets.newHashSet("/user/hive/warehouse/db2.db/table2.1","/user/hive/warehouse/db2.db/table2.2"), pathImage.get("db2.table2")); + assertEquals(2, pathImage.get("db2.table2").size()); + assertEquals(Sets.newHashSet("/user/hive/warehouse/db2.db/table1.1", + "/user/hive/warehouse/db2.db/table1.2"), + pathImage.get("db1.table1")); + assertEquals(Sets.newHashSet("/user/hive/warehouse/db2.db/table2.1", + "/user/hive/warehouse/db2.db/table2.2"), + pathImage.get("db1.table2")); + assertEquals(Sets.newHashSet("/user/hive/warehouse/db2.db/table2.1", + "/user/hive/warehouse/db2.db/table2.3"), + pathImage.get("db2.table2")); + assertEquals(6, sentryStore.getMPaths().size()); } - /* - Makes sure that Authorizable object could be associated with multiple files and are associated with other Authorizable - objects and can be properly persisted into database. - */ @Test - public void testAuthzSharedPathsMapping() throws Exception { - sentryStore.createAuthzPathsMapping("db1.table1", Sets.newHashSet("/user/hive/warehouse/db1.db/table1.1","/user/hive/warehouse/db1.db/table1.2")); - + public void testAddDeleteAuthzPathsMapping() throws Exception { + // Add "db1.table1" authzObj + PathsUpdate addUpdate = new PathsUpdate(0, false); + addUpdate.newPathChange("db1.table"). + addToAddPaths(Arrays.asList("db1", "tbl1")); + addUpdate.newPathChange("db1.table"). + addToAddPaths(Arrays.asList("db1", "tbl2")); + sentryStore.addAuthzPathsMapping("db1.table", + Sets.newHashSet("db1/tbl1", "db1/tbl2"), addUpdate); PathsImage pathsImage = sentryStore.retrieveFullPathsImage(); Map<String, Set<String>> pathImage = pathsImage.getPathImage(); assertEquals(1, pathImage.size()); - for (Map.Entry<String, Set<String>> entry : pathImage.entrySet()) { - assertEquals(2, entry.getValue().size()); - } - assertEquals(Sets.newHashSet("/user/hive/warehouse/db1.db/table1.1","/user/hive/warehouse/db1.db/table1.2"), pathImage.get("db1.table1")); + assertEquals(2, pathImage.get("db1.table").size()); + assertEquals(2, sentryStore.getMPaths().size()); + + // Query the persisted path change and ensure it equals to the original one + long lastChangeID = sentryStore.getLastProcessedPathChangeID(); + MSentryPathChange addPathChange = sentryStore.getMSentryPathChangeByID(lastChangeID); + assertEquals(addUpdate.JSONSerialize(), addPathChange.getPathChange()); + + // Delete path 'db1.db/tbl1' from "db1.table1" authzObj. + PathsUpdate delUpdate = new PathsUpdate(1, false); + delUpdate.newPathChange("db1.table") + .addToDelPaths(Arrays.asList("db1", "tbl1")); + sentryStore.deleteAuthzPathsMapping("db1.table", Sets.newHashSet("db1/tbl1"), delUpdate); + pathImage = sentryStore.retrieveFullPathsImage().getPathImage(); + assertEquals(1, pathImage.size()); + assertEquals(1, pathImage.get("db1.table").size()); + assertEquals(1, sentryStore.getMPaths().size()); + // Query the persisted path change and ensure it equals to the original one + lastChangeID = sentryStore.getLastProcessedPathChangeID(); + MSentryPathChange delPathChange = sentryStore.getMSentryPathChangeByID(lastChangeID); + assertEquals(delUpdate.JSONSerialize(), delPathChange.getPathChange()); + + // Delete "db1.table" authzObj from the authzObj -> [Paths] mapping. + PathsUpdate delAllupdate = new PathsUpdate(2, false); + delAllupdate.newPathChange("db1.table") + .addToDelPaths(Lists.newArrayList(PathsUpdate.ALL_PATHS)); + sentryStore.deleteAllAuthzPathsMapping("db1.table", delAllupdate); + pathImage = sentryStore.retrieveFullPathsImage().getPathImage(); + assertEquals(0, pathImage.size()); + assertEquals(0, sentryStore.getMPaths().size()); + + // Query the persisted path change and ensure it equals to the original one + lastChangeID = sentryStore.getLastProcessedPathChangeID(); + MSentryPathChange delAllPathChange = sentryStore.getMSentryPathChangeByID(lastChangeID); + assertEquals(delAllupdate.JSONSerialize(), delAllPathChange.getPathChange()); + } + + @Test + public void testRenameUpdateAuthzPathsMapping() throws Exception { Map<String, Set<String>> authzPaths = new HashMap<>(); - authzPaths.put("db2.table1", Sets.newHashSet("/user/hive/warehouse/db1.db/table1.1","/user/hive/warehouse/db1.db/table1.2")); + authzPaths.put("db1.table1", Sets.newHashSet("user/hive/warehouse/db1.db/table1", + "user/hive/warehouse/db1.db/table1/p1")); + authzPaths.put("db1.table2", Sets.newHashSet("user/hive/warehouse/db1.db/table2")); sentryStore.persistFullPathsImage(authzPaths); - pathsImage = sentryStore.retrieveFullPathsImage(); - pathImage = pathsImage.getPathImage(); - assertEquals(2, pathImage.size()); - for (Map.Entry<String, Set<String>> entry : pathImage.entrySet()) { - assertEquals(2, entry.getValue().size()); - } - assertEquals(Sets.newHashSet("/user/hive/warehouse/db1.db/table1.1","/user/hive/warehouse/db1.db/table1.2"), pathImage.get("db2.table1")); + Map<String, Set<String>> pathsImage = sentryStore.retrieveFullPathsImage().getPathImage(); + assertEquals(2, pathsImage.size()); + + // Rename path of 'db1.table1' from 'db1.table1' to 'db1.newTable1' + PathsUpdate renameUpdate = new PathsUpdate(0, false); + renameUpdate.newPathChange("db1.table1") + .addToDelPaths(Arrays.asList("user", "hive", "warehouse", "db1.db", "table1")); + renameUpdate.newPathChange("db1.newTable1") + .addToAddPaths(Arrays.asList("user", "hive", "warehouse", "db1.db", "newTable1")); + sentryStore.renameAuthzPathsMapping("db1.table1", "db1.newTable1", + "user/hive/warehouse/db1.db/table1", "user/hive/warehouse/db1.db/newTable1", renameUpdate); + pathsImage = sentryStore.retrieveFullPathsImage().getPathImage(); + assertEquals(2, pathsImage.size()); + assertEquals(3, sentryStore.getMPaths().size()); + assertTrue(pathsImage.containsKey("db1.newTable1")); + assertEquals(Sets.newHashSet("user/hive/warehouse/db1.db/table1/p1", + "user/hive/warehouse/db1.db/newTable1"), + pathsImage.get("db1.newTable1")); + + // Query the persisted path change and ensure it equals to the original one + long lastChangeID = sentryStore.getLastProcessedPathChangeID(); + MSentryPathChange renamePathChange = sentryStore.getMSentryPathChangeByID(lastChangeID); + assertEquals(renameUpdate.JSONSerialize(), renamePathChange.getPathChange()); + + // Rename 'db1.table1' to "db1.table2" but did not change its location. + renameUpdate = new PathsUpdate(1, false); + renameUpdate.newPathChange("db1.newTable1") + .addToDelPaths(Arrays.asList("user", "hive", "warehouse", "db1.db", "newTable1")); + renameUpdate.newPathChange("db1.newTable2") + .addToAddPaths(Arrays.asList("user", "hive", "warehouse", "db1.db", "newTable1")); + sentryStore.renameAuthzObj("db1.newTable1", "db1.newTable2", renameUpdate); + pathsImage = sentryStore.retrieveFullPathsImage().getPathImage(); + assertEquals(2, pathsImage.size()); + assertEquals(3, sentryStore.getMPaths().size()); + assertTrue(pathsImage.containsKey("db1.newTable2")); + assertEquals(Sets.newHashSet("user/hive/warehouse/db1.db/table1/p1", + "user/hive/warehouse/db1.db/newTable1"), + pathsImage.get("db1.newTable2")); + + // Query the persisted path change and ensure it equals to the original one + lastChangeID = sentryStore.getLastProcessedPathChangeID(); + renamePathChange = sentryStore.getMSentryPathChangeByID(lastChangeID); + assertEquals(renameUpdate.JSONSerialize(), renamePathChange.getPathChange()); + + // Update path of 'db1.newTable2' from 'db1.newTable1' to 'db1.newTable2' + PathsUpdate update = new PathsUpdate(3, false); + update.newPathChange("db1.newTable1") + .addToDelPaths(Arrays.asList("user", "hive", "warehouse", "db1.db", "newTable1")); + update.newPathChange("db1.newTable1") + .addToAddPaths(Arrays.asList("user", "hive", "warehouse", "db1.db", "newTable2")); + sentryStore.updateAuthzPathsMapping("db1.newTable2", + "user/hive/warehouse/db1.db/newTable1", + "user/hive/warehouse/db1.db/newTable2", + update); + pathsImage = sentryStore.retrieveFullPathsImage().getPathImage(); + assertEquals(2, pathsImage.size()); + assertEquals(3, sentryStore.getMPaths().size()); + assertTrue(pathsImage.containsKey("db1.newTable2")); + assertEquals(Sets.newHashSet("user/hive/warehouse/db1.db/table1/p1", + "user/hive/warehouse/db1.db/newTable2"), + pathsImage.get("db1.newTable2")); + + // Query the persisted path change and ensure it equals to the original one + lastChangeID = sentryStore.getLastProcessedPathChangeID(); + MSentryPathChange updatePathChange = sentryStore.getMSentryPathChangeByID(lastChangeID); + assertEquals(update.JSONSerialize(), updatePathChange.getPathChange()); } + @Test public void testQueryParamBuilder() { QueryParamBuilder paramBuilder; paramBuilder = newQueryParamBuilder(); @@ -2572,6 +2678,7 @@ public class TestSentryStore extends org.junit.Assert { // Query the persisted perm change and ensure it equals to the original one long lastChangeID = sentryStore.getLastProcessedPermChangeID(); + long initialID = lastChangeID; MSentryPermChange addPermChange = sentryStore.getMSentryPermChangeByID(lastChangeID); assertEquals(addUpdate.JSONSerialize(), addPermChange.getPermChange()); @@ -2596,7 +2703,7 @@ public class TestSentryStore extends org.junit.Assert { // Verify getMSentryPermChanges will return all MSentryPermChanges up // to the given changeID. - List<MSentryPermChange> mSentryPermChanges = sentryStore.getMSentryPermChanges(1); + List<MSentryPermChange> mSentryPermChanges = sentryStore.getMSentryPermChanges(initialID); assertEquals(lastChangeID, mSentryPermChanges.size()); // Verify ifPermChangeExists will return true for persisted MSentryPermChange. http://git-wip-us.apache.org/repos/asf/sentry/blob/a5e772b5/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationAdvanced.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationAdvanced.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationAdvanced.java index 24ab1a8..e771ce7 100644 --- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationAdvanced.java +++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationAdvanced.java @@ -31,7 +31,6 @@ import org.apache.sentry.hdfs.PathsUpdate; import org.apache.sentry.tests.e2e.hive.StaticUserGroup; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; @@ -42,7 +41,6 @@ import org.apache.hadoop.hive.metastore.api.Table; /** * Advanced tests for HDFS Sync integration */ -@Ignore public class TestHDFSIntegrationAdvanced extends TestHDFSIntegrationBase { private static final Logger LOGGER = LoggerFactory @@ -566,7 +564,6 @@ public class TestHDFSIntegrationAdvanced extends TestHDFSIntegrationBase { /* SENTRY-1471 - fixing the validation logic revealed that this test is broken. * Disabling this test for now; to be fixed in a separate JIRA. */ - @Ignore @Test public void testAuthzObjOnPartitionMultipleTables() throws Throwable { String dbName = "db1"; @@ -602,7 +599,7 @@ public class TestHDFSIntegrationAdvanced extends TestHDFSIntegrationBase { // Create external table tab2 and partition on location '/tmp/external'. // Create tab2_role, and grant it with select permission on table tab2 to user_group2. stmt.execute("create external table tab2 (s string) partitioned by (month int)"); - stmt.execute("alter table tab2 add partition (month = 1) location '" + tmpHDFSPartitionStr + "'"); + stmt.execute("alter table tab2 add partition (month = 1) location '" + tmpHDFSDirStr + "'"); stmt.execute("create role tab2_role"); stmt.execute("grant select on table tab2 to role tab2_role"); stmt.execute("grant role tab2_role to group " + StaticUserGroup.USERGROUP2); http://git-wip-us.apache.org/repos/asf/sentry/blob/a5e772b5/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationBase.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationBase.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationBase.java index 7769f24..b5247d0 100644 --- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationBase.java +++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationBase.java @@ -497,7 +497,8 @@ public abstract class TestHDFSIntegrationBase { hiveConf.set("hive.metastore.authorization.storage.checks", "true"); hiveConf.set("hive.metastore.uris", "thrift://localhost:" + hmsPort); hiveConf.set("hive.metastore.pre.event.listeners", "org.apache.sentry.binding.metastore.MetastoreAuthzBinding"); - hiveConf.set("hive.metastore.transactional.event.listeners", "org.apache.hive.hcatalog.listener.DbNotificationListener"); + hiveConf.set("hive.metastore.event.listeners", "org.apache.sentry.binding.metastore.SentryMetastorePostEventListenerNotificationLog"); + hiveConf.set("hcatalog.message.factory.impl.json", "org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageFactory"); hiveConf.set("hive.security.authorization.task.factory", "org.apache.sentry.binding.hive.SentryHiveAuthorizationTaskFactoryImpl"); hiveConf.set("hive.server2.session.hook", "org.apache.sentry.binding.hive.HiveAuthzBindingSessionHook"); hiveConf.set("sentry.metastore.service.users", "hive");// queries made by hive user (beeline) skip meta store check @@ -698,6 +699,8 @@ public abstract class TestHDFSIntegrationBase { "org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessorFactory,org.apache.sentry.hdfs.SentryHDFSServiceProcessorFactory"); properties.put(ServerConfig.SENTRY_NOTIFICATION_LOG_ENABLED,"true"); properties.put("sentry.policy.store.plugins", "org.apache.sentry.hdfs.SentryPlugin"); + properties.put(ServerConfig.SENTRY_HMSFOLLOWER_INIT_DELAY_MILLS, "10000"); + properties.put(ServerConfig.SENTRY_HMSFOLLOWER_INTERVAL_MILLS, "50"); properties.put(ServerConfig.RPC_MIN_THREADS, "3"); for (Map.Entry<String, String> entry : properties.entrySet()) { sentryConf.set(entry.getKey(), entry.getValue()); http://git-wip-us.apache.org/repos/asf/sentry/blob/a5e772b5/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationEnd2End.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationEnd2End.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationEnd2End.java index 0e97466..1ace07c 100644 --- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationEnd2End.java +++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationEnd2End.java @@ -40,11 +40,11 @@ import java.util.ArrayList; /** * This test class includes all HDFS Sync smoke tests */ -@Ignore public class TestHDFSIntegrationEnd2End extends TestHDFSIntegrationBase { private static final Logger LOGGER = LoggerFactory .getLogger(TestHDFSIntegrationEnd2End.class); + @Ignore @Test public void testEnd2End() throws Throwable { tmpHDFSDir = new Path("/tmp/external");
