SENTRY-1612 Change-Id: If429dee0836cc3ceff9ff276aec670c631fc27d1
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/813d10e8 Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/813d10e8 Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/813d10e8 Branch: refs/heads/sentry-ha-redesign-1 Commit: 813d10e86aeb35fb6c89c983761c194cb6b0ac7b Parents: dbd5870 Author: hahao <hao....@cloudera.com> Authored: Thu Jan 26 17:54:19 2017 -0800 Committer: hahao <hao....@cloudera.com> Committed: Thu Jan 26 17:54:19 2017 -0800 ---------------------------------------------------------------------- .../sentry/hdfs/FullUpdateInitializer.java | 33 +++++++++--- .../sentry/hdfs/TestFullUpdateInitializer.java | 29 ++++------ .../db/service/persistent/SentryStore.java | 57 ++++++++++++++++++-- .../sentry/service/thrift/HMSFollower.java | 42 +++++++++++---- 4 files changed, 121 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/813d10e8/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java index a1f970b..c990c53 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java +++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java @@ -19,6 +19,7 @@ package org.apache.sentry.hdfs; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; @@ -30,8 +31,7 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -296,9 +296,7 @@ public class FullUpdateInitializer implements Closeable { ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE_DEFAULT); } - public UpdateableAuthzPaths createInitialUpdate() throws Exception { - UpdateableAuthzPaths authzPaths = new UpdateableAuthzPaths(new - String[]{"/"}); + public Map<String, Set<String>> createInitialUpdate() throws Exception { PathsUpdate tempUpdate = new PathsUpdate(-1, false); List<String> allDbStr = client.getAllDatabases(); for (String dbName : allDbStr) { @@ -321,11 +319,32 @@ public class FullUpdateInitializer implements Closeable { } } - authzPaths.updatePartial(Lists.newArrayList(tempUpdate), new ReentrantReadWriteLock()); - return authzPaths; + return getAuthzObjToPathMapping(tempUpdate); } + /** + * Parsing pathsUpdate to get the mapping of hiveObj -> [Paths]. + * Only processing AddPaths, since in {@link FullUpdateInitializer} only + * adds paths when fetching full HMS Paths snapshot. + * + * @return mapping of hiveObj -> [Paths]. + */ + private Map<String, Set<String>> getAuthzObjToPathMapping(PathsUpdate pathsUpdate) { + Map<String, Set<String>> authzObjToPath = new HashMap<>(); + for (TPathChanges pathChanges : pathsUpdate.getPathChanges()) { + // Only processing AddPaths + List<List<String>> addPaths = pathChanges.getAddPaths(); + Set<String> paths = Sets.newHashSet(); + for (List<String> addPath : addPaths) { + paths.add(PathsUpdate.getPath(addPath)); + } + authzObjToPath.put(pathChanges.getAuthzObj(), paths); + } + + return authzObjToPath; + } + @Override public void close() throws IOException { if (threadPool != null) { http://git-wip-us.apache.org/repos/asf/sentry/blob/813d10e8/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java index 0bb6f66..792b847 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java +++ b/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java @@ -18,6 +18,7 @@ package org.apache.sentry.hdfs; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; @@ -28,9 +29,7 @@ import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; +import java.util.*; public class TestFullUpdateInitializer { @@ -107,22 +106,14 @@ public class TestFullUpdateInitializer { FullUpdateInitializer cacheInitializer = new FullUpdateInitializer(client, conf); - UpdateableAuthzPaths update = cacheInitializer.createInitialUpdate(); - - Assert.assertEquals(new HashSet<String>(Arrays.asList("db1")), update.findAuthzObjectExactMatches(new - String[]{"db1"})); - Assert.assertEquals(new HashSet<String>(Arrays.asList("db2")), update.findAuthzObjectExactMatches(new - String[]{"db2"})); - Assert.assertEquals(new HashSet<String>(Arrays.asList("db2.tab21")), update.findAuthzObjectExactMatches(new - String[]{"db2", "tab21"})); - Assert.assertEquals(new HashSet<String>(Arrays.asList("db3")), update.findAuthzObjectExactMatches(new - String[]{"db3"})); - Assert.assertEquals(new HashSet<String>(Arrays.asList("db3.tab31")), update.findAuthzObjectExactMatches(new - String[]{"db3", "tab31"})); - Assert.assertEquals(new HashSet<String>(Arrays.asList("db3.tab31")), update.findAuthzObjectExactMatches(new - String[]{"db3", "tab31", "part311"})); - Assert.assertEquals(new HashSet<String>(Arrays.asList("db3.tab31")), update.findAuthzObjectExactMatches(new - String[]{"db3", "tab31", "part312"})); + Map<String, Set<String>> update = cacheInitializer.createInitialUpdate(); + + Assert.assertEquals(update.get("db1"), Sets.newHashSet("db1")); + Assert.assertEquals(update.get("db2"), Sets.newHashSet("db2")); + Assert.assertEquals(update.get("db2.tab21"), Sets.newHashSet("db2/tab21")); + Assert.assertEquals(update.get("db3.tab31"), Sets.newHashSet("db3/tab31", + "db3/tab31/part311", "db3/tab31/part312")); + cacheInitializer.close(); } http://git-wip-us.apache.org/repos/asf/sentry/blob/813d10e8/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java index 46e23c6..3536579 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java @@ -49,7 +49,6 @@ import org.apache.sentry.core.common.exception.*; import org.apache.sentry.core.common.utils.SentryConstants; import org.apache.sentry.core.model.db.AccessConstants; import org.apache.sentry.core.model.db.DBModelAuthorizable.AuthorizableType; -import org.apache.sentry.hdfs.Updateable; import org.apache.sentry.provider.db.service.model.*; import org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessor; import org.apache.sentry.provider.db.service.thrift.TSentryActiveRoleSet; @@ -105,7 +104,7 @@ public class SentryStore { // is starting from 1. public static final long INIT_CHANGE_ID = 1L; - private static final long EMPTY_CHANGE_ID = 0L; + public static final long EMPTY_CHANGE_ID = 0L; // For counters, representation of the "unknown value" private static final long COUNT_VALUE_UNKNOWN = -1; @@ -2300,6 +2299,24 @@ public class SentryStore { return retVal; } + /** + * Persist a full hive snapshot into Sentry DB in a single transaction. + * + * @param authzPaths Mapping of hiveObj -> [Paths] + * @throws Exception + */ + public void persistFullPathsImage(final Map<String, Set<String>> authzPaths) throws Exception { + tm.executeTransactionWithRetry( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + for (Map.Entry<String, Set<String>> authzPath : authzPaths.entrySet()) { + createAuthzPathsMappingCore(pm, authzPath.getKey(), authzPath.getValue()); + } + return null; + } + }); + } + public void createAuthzPathsMapping(final String hiveObj, final Set<String> paths) throws Exception { tm.executeTransactionWithRetry( @@ -3339,10 +3356,10 @@ public class SentryStore { } /** - * Get the MSentryPermChange object by ChangeID. Internally invoke + * Get the last processed perm change ID. Internally invoke * getLastProcessedPermChangeIDCore(). * - * @return MSentryPermChange + * @return the change id of last processed MSentryPermChange. */ @VisibleForTesting long getLastProcessedPermChangeID() throws Exception { @@ -3355,6 +3372,38 @@ public class SentryStore { } /** + * Get the last processed path change ID. + * + * @param pm the PersistenceManager + * @return the last processed path changedID + */ + private long getLastProcessedPathChangeIDCore(PersistenceManager pm) { + Query query = pm.newQuery(MSentryPathChange.class); + query.setResult("max(this.changeID)"); + Long changeID = (Long) query.execute(); + if (changeID == null) { + return EMPTY_CHANGE_ID; + } else { + return changeID; + } + } + + /** + * Get the last processed path change ID. Internally invoke + * getLastProcessedPathChangeIDCore(). + * + * @return the change id of last processed MSentryPathChange. + */ + public long getLastProcessedPathChangeID() throws Exception { + return tm.executeTransaction( + new TransactionBlock<Long>() { + public Long execute(PersistenceManager pm) throws Exception { + return getLastProcessedPathChangeIDCore(pm); + } + }); + } + + /** * Get the MSentryPermChange object by ChangeID. * * @param changeID the given changeID. http://git-wip-us.apache.org/repos/asf/sentry/blob/813d10e8/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java index 59646b6..ad6bdda 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java @@ -30,6 +30,7 @@ import org.apache.hadoop.security.SaslRpcServer; import org.apache.hive.hcatalog.messaging.HCatEventMessage; import org.apache.sentry.binding.hive.conf.HiveAuthzConf; import org.apache.sentry.core.common.exception.*; +import org.apache.sentry.hdfs.PathsUpdate; import org.apache.sentry.hdfs.PermissionsUpdate; import org.apache.sentry.hdfs.UpdateableAuthzPaths; import org.apache.sentry.hdfs.FullUpdateInitializer; @@ -49,6 +50,8 @@ import java.io.File; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; import java.util.List; +import java.util.Map; +import java.util.Set; import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_CREATE_WITH_POLICY_STORE; import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_DROP_WITH_POLICY_STORE; @@ -74,7 +77,6 @@ public class HMSFollower implements Runnable { private static final int maxRetriesForLogin = 3; private static final int maxRetriesForConnection = 3; - private volatile UpdateableAuthzPaths authzPaths; private boolean needHiveSnapshot = true; private final LeaderStatusMonitor leaderMonitor; @@ -84,8 +86,11 @@ public class HMSFollower implements Runnable { authzConf = conf; this.leaderMonitor = leaderMonitor; sentryStore = new SentryStore(authzConf); - //TODO: Initialize currentEventID from Sentry db - currentEventID = 0; + + // Initialize currentEventID from Sentry db. If currentEventID + // is empty, need to retrieve a hive snapshot, otherwise not. + currentEventID = getStoredCurrentID(); + needHiveSnapshot = (currentEventID == SentryStore.EMPTY_CHANGE_ID); } @VisibleForTesting @@ -227,12 +232,13 @@ public class HMSFollower implements Runnable { CurrentNotificationEventId eventIDBefore = null; CurrentNotificationEventId eventIDAfter = null; + Map<String, Set<String>> pathsFullSnapshot = null; try { eventIDBefore = client.getCurrentNotificationEventId(); LOGGER.info(String.format("Before fetching hive full snapshot, Current NotificationID = %s.", eventIDBefore)); - fetchFullUpdate(); + pathsFullSnapshot = fetchFullUpdate(); eventIDAfter = client.getCurrentNotificationEventId(); LOGGER.info(String.format("After fetching hive full snapshot, Current NotificationID = %s.", eventIDAfter)); @@ -252,6 +258,7 @@ public class HMSFollower implements Runnable { eventIDAfter)); needHiveSnapshot = false; currentEventID = eventIDAfter.getEventId(); + sentryStore.persistFullPathsImage(pathsFullSnapshot); } NotificationEventResponse response = client.getNextNotification(currentEventID, Integer.MAX_VALUE, null); @@ -261,25 +268,30 @@ public class HMSFollower implements Runnable { processNotificationEvents(response.getEvents()); } } catch (TException e) { - LOGGER.error("ThriftException occured fetching Notification entries, will try"); + LOGGER.error("ThriftException occurred fetching Notification entries, will try"); e.printStackTrace(); } catch (SentryInvalidInputException|SentryInvalidHMSEventException e) { throw new RuntimeException(e); + } catch (Exception e) { + LOGGER.error("Exception occurred persisting Hive full snapshot into DB"); + e.printStackTrace(); } } /** - * Retrieve HMS full snapshot. + * Retrieve a HMS full snapshot. + * + * @return UpdateableAuthzPaths + * @throws Exception */ - private void fetchFullUpdate() throws Exception { + private Map<String, Set<String>> fetchFullUpdate() throws Exception { FullUpdateInitializer updateInitializer = null; try { updateInitializer = new FullUpdateInitializer(client, authzConf); - // TODO - do we need to save returned authz path? - updateInitializer.createInitialUpdate(); - // TODO: notify HDFS plugin + Map<String, Set<String>> pathsUpdate = updateInitializer.createInitialUpdate(); LOGGER.info("#### Hive full update initialization complete !!"); + return pathsUpdate; } finally { if (updateInitializer != null) { try { @@ -291,6 +303,16 @@ public class HMSFollower implements Runnable { } } + /** + * Get current eventID from Sentry DB. + * + * @return the stored currentID + * @throws Exception + */ + private long getStoredCurrentID() throws Exception { + return sentryStore.getLastProcessedPathChangeID(); + } + private boolean syncWithPolicyStore(HiveAuthzConf.AuthzConfVars syncConfVar) { return "true" .equalsIgnoreCase((authzConf.get(syncConfVar.getVar(), "true")));