Repository: sentry Updated Branches: refs/heads/sentry-ha-redesign 7aac09ec6 -> 1c6ba5ebe
SENTRY-1612: HMSFollower should persist full HMS snapshot into SentryDB if there is not one. (Hao Hao, Reviewed by: Alexander Kolbasov) Change-Id: I375ea19277fa3092f1825476f1670652e4c981c4 Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/1c6ba5eb Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/1c6ba5eb Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/1c6ba5eb Branch: refs/heads/sentry-ha-redesign Commit: 1c6ba5ebe4a4374b2472633dd21435eec9933d6f Parents: 7aac09e Author: hahao <[email protected]> Authored: Wed Mar 1 18:01:50 2017 -0800 Committer: hahao <[email protected]> Committed: Wed Mar 1 18:01:50 2017 -0800 ---------------------------------------------------------------------- .../sentry/hdfs/FullUpdateInitializer.java | 44 ++++++++++++++++---- .../org/apache/sentry/hdfs/PathsUpdate.java | 24 +++++++++++ .../sentry/hdfs/TestFullUpdateInitializer.java | 31 ++++++-------- .../db/service/model/MSentryPathChange.java | 4 ++ .../db/service/persistent/SentryStore.java | 44 +++++++++++++++++++- .../sentry/service/thrift/HMSFollower.java | 40 +++++++++++++----- .../db/service/persistent/TestSentryStore.java | 14 ++++--- 7 files changed, 157 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/1c6ba5eb/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java index f95dd94..146cea2 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java +++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java @@ -18,7 +18,7 @@ package org.apache.sentry.hdfs; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; @@ -33,9 +33,11 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Set; +import java.util.Map; +import java.util.HashMap; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReentrantReadWriteLock; /** * FullUpdateInitializer is for fetching hive full update, @@ -297,9 +299,8 @@ public class FullUpdateInitializer implements Closeable { ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE_DEFAULT); } - public UpdateableAuthzPaths createInitialUpdate() throws ExecutionException, InterruptedException, TException { - UpdateableAuthzPaths authzPaths = new UpdateableAuthzPaths(new - String[]{"/"}); + public Map<String, Set<String>> createInitialUpdate() throws ExecutionException, + InterruptedException, TException { PathsUpdate tempUpdate = new PathsUpdate(-1, false); List<String> allDbStr = client.getAllDatabases(); for (String dbName : allDbStr) { @@ -322,15 +323,42 @@ public class FullUpdateInitializer implements Closeable { } } - authzPaths.updatePartial(Lists.newArrayList(tempUpdate), new ReentrantReadWriteLock()); - return authzPaths; + return getAuthzObjToPathMapping(tempUpdate); } + /** + * Parsing a pathsUpdate to get the mapping of hiveObj -> [Paths]. + * It only processes {@link TPathChanges}.addPaths, since + * {@link FullUpdateInitializer} only add paths when fetching + * full HMS Paths snapshot. Each path represented as path tree + * concatenated by "/". e.g 'usr/hive/warehouse'. + * + * @return mapping of hiveObj -> [Paths]. + */ + private Map<String, Set<String>> getAuthzObjToPathMapping(PathsUpdate pathsUpdate) { + Map<String, Set<String>> authzObjToPath = new HashMap<>(); + List<TPathChanges> tPathChanges = pathsUpdate.getPathChanges(); + + if (!tPathChanges.isEmpty()) { + for (TPathChanges pathChanges : tPathChanges) { + // Only processes TPathChanges.addPaths + List<List<String>> addPaths = pathChanges.getAddPaths(); + Set<String> paths = Sets.newHashSet(); + for (List<String> addPath : addPaths) { + paths.add(PathsUpdate.cancatePath(addPath)); + } + authzObjToPath.put(pathChanges.getAuthzObj(), paths); + } + } + + return authzObjToPath; + } + @Override public void close() throws IOException { if (threadPool != null) { threadPool.shutdownNow(); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/sentry/blob/1c6ba5eb/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java index ffb0756..992c8b7 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java +++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java @@ -24,6 +24,8 @@ import java.util.LinkedList; import java.util.List; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; import org.apache.sentry.hdfs.service.thrift.TPathChanges; import org.apache.sentry.hdfs.service.thrift.TPathsUpdate; import org.apache.commons.httpclient.util.URIUtil; @@ -153,6 +155,28 @@ public class PathsUpdate implements Updateable.Update { } } + /** + * Given a path tree in a list, return a string concatenated by "/". + * e.g < usr, hive, warehouse > -> 'usr/hive/warehouse'. + * + * @param paths + * @return a path string concatenated by "/". + */ + public static String cancatePath(Iterable<String> paths) { + return Joiner.on("/").join(paths); + } + + /** + * Split a path a path concatenated by "/" into a path tree represented + * as a list. + * + * @param path + * @return a path tree represented as a list. + */ + public static List<String> splitPath(String path) { + return Lists.newArrayList(Splitter.on("/").split(path)); + } + @Override public byte[] serialize() throws IOException { return ThriftSerializer.serialize(tPathsUpdate); http://git-wip-us.apache.org/repos/asf/sentry/blob/1c6ba5eb/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java index 0bb6f66..f338ce8 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java +++ b/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java @@ -18,6 +18,7 @@ package org.apache.sentry.hdfs; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; @@ -29,8 +30,8 @@ import org.junit.Test; import org.mockito.Mockito; import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; +import java.util.Map; +import java.util.Set; public class TestFullUpdateInitializer { @@ -107,22 +108,14 @@ public class TestFullUpdateInitializer { FullUpdateInitializer cacheInitializer = new FullUpdateInitializer(client, conf); - UpdateableAuthzPaths update = cacheInitializer.createInitialUpdate(); - - Assert.assertEquals(new HashSet<String>(Arrays.asList("db1")), update.findAuthzObjectExactMatches(new - String[]{"db1"})); - Assert.assertEquals(new HashSet<String>(Arrays.asList("db2")), update.findAuthzObjectExactMatches(new - String[]{"db2"})); - Assert.assertEquals(new HashSet<String>(Arrays.asList("db2.tab21")), update.findAuthzObjectExactMatches(new - String[]{"db2", "tab21"})); - Assert.assertEquals(new HashSet<String>(Arrays.asList("db3")), update.findAuthzObjectExactMatches(new - String[]{"db3"})); - Assert.assertEquals(new HashSet<String>(Arrays.asList("db3.tab31")), update.findAuthzObjectExactMatches(new - String[]{"db3", "tab31"})); - Assert.assertEquals(new HashSet<String>(Arrays.asList("db3.tab31")), update.findAuthzObjectExactMatches(new - String[]{"db3", "tab31", "part311"})); - Assert.assertEquals(new HashSet<String>(Arrays.asList("db3.tab31")), update.findAuthzObjectExactMatches(new - String[]{"db3", "tab31", "part312"})); + Map<String, Set<String>> update = cacheInitializer.createInitialUpdate(); + + Assert.assertEquals(update.get("db1"), Sets.newHashSet("db1")); + Assert.assertEquals(update.get("db2"), Sets.newHashSet("db2")); + Assert.assertEquals(update.get("db2.tab21"), Sets.newHashSet("db2/tab21")); + Assert.assertEquals(update.get("db3.tab31"), Sets.newHashSet("db3/tab31", + "db3/tab31/part311", "db3/tab31/part312")); + cacheInitializer.close(); } @@ -167,4 +160,4 @@ public class TestFullUpdateInitializer { Assert.assertTrue(e instanceof RuntimeException); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/sentry/blob/1c6ba5eb/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java index 0ca7fe2..a0d3445 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java @@ -89,6 +89,10 @@ public class MSentryPathChange implements MSentryChange { return changeID; } + public long getNotificationID() { + return notificationID; + } + @Override public String toString() { return "MSentryChange [changeID=" + changeID + " , notificationID= " http://git-wip-us.apache.org/repos/asf/sentry/blob/1c6ba5eb/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java index b9272bc..c1186ba 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java @@ -121,7 +121,7 @@ public class SentryStore { // is starting from 1. public static final long INIT_CHANGE_ID = 1L; - private static final long EMPTY_CHANGE_ID = 0L; + public static final long EMPTY_CHANGE_ID = 0L; // For counters, representation of the "unknown value" private static final long COUNT_VALUE_UNKNOWN = -1; @@ -2329,6 +2329,24 @@ public class SentryStore { return result; } + /** + * Persist a full hive snapshot into Sentry DB in a single transaction. + * + * @param authzPaths Mapping of hiveObj -> [Paths] + * @throws Exception + */ + public void persistFullPathsImage(final Map<String, Set<String>> authzPaths) throws Exception { + tm.executeTransactionWithRetry( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + for (Map.Entry<String, Set<String>> authzPath : authzPaths.entrySet()) { + createAuthzPathsMappingCore(pm, authzPath.getKey(), authzPath.getValue()); + } + return null; + } + }); + } + public void createAuthzPathsMapping(final String hiveObj, final Set<String> paths) throws Exception { tm.executeTransactionWithRetry( @@ -3063,7 +3081,8 @@ public class SentryStore { * @param pm the PersistenceManager * @param changeCls the class of a delta c * - * @return the last processed changedID for the delta changes. + * @return the last processed changedID for the delta changes. If no + * change found then return 0. */ private <T extends MSentryChange> long getLastProcessedChangeIDCore( PersistenceManager pm, Class<T> changeCls) { @@ -3091,6 +3110,27 @@ public class SentryStore { } /** + * Get the notification ID of last processed path delta change. + * + * @return the notification ID of latest path change. If no change + * found then return 0. + */ + public long getLastProcessedNotificationID() throws Exception { + return tm.executeTransaction( + new TransactionBlock<Long>() { + public Long execute(PersistenceManager pm) throws Exception { + long changeID = getLastProcessedChangeIDCore(pm, MSentryPathChange.class); + if (changeID == EMPTY_CHANGE_ID) { + return EMPTY_CHANGE_ID; + } else { + MSentryPathChange mSentryPathChange = getMSentryPathChangeByID(changeID); + return mSentryPathChange.getNotificationID(); + } + } + }); + } + + /** * Get the MSentryPermChange object by ChangeID. * * @param changeID the given changeID. http://git-wip-us.apache.org/repos/asf/sentry/blob/1c6ba5eb/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java index bdbb0cc..8b07f5b 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java @@ -32,6 +32,7 @@ import org.apache.hadoop.security.SecurityUtil; import org.apache.hive.hcatalog.messaging.HCatEventMessage; import org.apache.sentry.binding.hive.conf.HiveAuthzConf; import org.apache.sentry.core.common.exception.*; +import org.apache.sentry.hdfs.PathsUpdate; import org.apache.sentry.hdfs.PermissionsUpdate; import org.apache.sentry.hdfs.UpdateableAuthzPaths; import org.apache.sentry.hdfs.FullUpdateInitializer; @@ -52,6 +53,8 @@ import java.net.SocketException; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_CREATE_WITH_POLICY_STORE; @@ -77,7 +80,6 @@ public class HMSFollower implements Runnable { private final SentryStore sentryStore; private String hiveInstance; - private volatile UpdateableAuthzPaths authzPaths; private boolean needHiveSnapshot = true; private final LeaderStatusMonitor leaderMonitor; @@ -87,8 +89,11 @@ public class HMSFollower implements Runnable { authzConf = conf; this.leaderMonitor = leaderMonitor; sentryStore = store; - //TODO: Initialize currentEventID from Sentry db - currentEventID = 0; + + // Initialize currentEventID based on the latest persisted notification ID. + // If currentEventID is empty, need to retrieve a full hive snapshot, + currentEventID = getLastProcessedNotificationID(); + needHiveSnapshot = (currentEventID == SentryStore.EMPTY_CHANGE_ID); } @VisibleForTesting @@ -222,13 +227,14 @@ public class HMSFollower implements Runnable { // will be dropped. A new attempts will be made after 500 milliseconds when // HMSFollower run again. + Map<String, Set<String>> pathsFullSnapshot; CurrentNotificationEventId eventIDBefore = client.getCurrentNotificationEventId(); LOGGER.info(String.format("Before fetching hive full snapshot, Current NotificationID = %s.", eventIDBefore)); try { - fetchFullUpdate(); + pathsFullSnapshot = fetchFullUpdate(); } catch (ExecutionException | InterruptedException ex) { - LOGGER.error("#### Encountered failure during fetching one hive full snapshot !!", ex); + LOGGER.error("#### Encountered failure during fetching hive full snapshot !!", ex); return; } @@ -245,6 +251,7 @@ public class HMSFollower implements Runnable { eventIDAfter)); needHiveSnapshot = false; currentEventID = eventIDAfter.getEventId(); + sentryStore.persistFullPathsImage(pathsFullSnapshot); } NotificationEventResponse response = client.getNextNotification(currentEventID, Integer.MAX_VALUE, null); @@ -292,17 +299,20 @@ public class HMSFollower implements Runnable { } /** - * Retrieve HMS full snapshot. + * Retrieve a Hive full snapshot from HMS. + * + * @return mapping of hiveObj -> [Paths]. + * @throws ExecutionException, InterruptedException, TException */ - private void fetchFullUpdate() throws ExecutionException, InterruptedException, TException { + private Map<String, Set<String>> fetchFullUpdate() + throws ExecutionException, InterruptedException, TException { FullUpdateInitializer updateInitializer = null; try { updateInitializer = new FullUpdateInitializer(client, authzConf); - // TODO - do we need to save returned authz path? - updateInitializer.createInitialUpdate(); - // TODO: notify HDFS plugin + Map<String, Set<String>> pathsUpdate = updateInitializer.createInitialUpdate(); LOGGER.info("#### Hive full update initialization complete !!"); + return pathsUpdate; } finally { if (updateInitializer != null) { try { @@ -314,6 +324,16 @@ public class HMSFollower implements Runnable { } } + /** + * Get the last processed eventID from Sentry DB. + * + * @return the stored currentID + * @throws Exception + */ + private long getLastProcessedNotificationID() throws Exception { + return sentryStore.getLastProcessedNotificationID(); + } + private boolean syncWithPolicyStore(HiveAuthzConf.AuthzConfVars syncConfVar) { return "true" .equalsIgnoreCase((authzConf.get(syncConfVar.getVar(), "true"))); http://git-wip-us.apache.org/repos/asf/sentry/blob/1c6ba5eb/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java index 0e22755..91f15c0 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java @@ -19,11 +19,7 @@ package org.apache.sentry.provider.db.service.persistent; import java.io.File; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; +import java.util.*; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; @@ -2199,6 +2195,14 @@ public class TestSentryStore extends org.junit.Assert { Map<String, Set<String>> pathsImage = sentryStore.retrieveFullPathsImage(); assertEquals(2, pathsImage.size()); assertEquals(Sets.newHashSet("/user/hive/warehouse/db1.db/table1"), pathsImage.get("db1.table1")); + + Map<String, Set<String>> authzPaths = new HashMap<>(); + authzPaths.put("db2.table1", Sets.newHashSet("/user/hive/warehouse/db2.db/table1")); + authzPaths.put("db2.table2", Sets.newHashSet("/user/hive/warehouse/db2.db/table2")); + sentryStore.persistFullPathsImage(authzPaths); + pathsImage = sentryStore.retrieveFullPathsImage(); + assertEquals(4, pathsImage.size()); + assertEquals(Sets.newHashSet("/user/hive/warehouse/db2.db/table1"), pathsImage.get("db2.table1")); } public void testQueryParamBuilder() {
