Repository: sentry Updated Branches: refs/heads/master 5aea068eb -> a001e4455
SENTRY-2031: Add trigger mechanism for Sentry to pull full path snapshot from HMS (Vadim Spector, reviewed by Sergio Pena) Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/a001e445 Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/a001e445 Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/a001e445 Branch: refs/heads/master Commit: a001e445598f62bbf1551282f64d3d600c4775a3 Parents: 5aea068 Author: Vadim Spector <[email protected]> Authored: Mon Nov 6 10:44:32 2017 -0800 Committer: Vadim Spector <[email protected]> Committed: Mon Nov 6 10:44:32 2017 -0800 ---------------------------------------------------------------------- .../apache/sentry/hdfs/ServiceConstants.java | 2 +- .../sentry/service/thrift/HMSFollower.java | 32 ++++++- .../sentry/service/thrift/TestHMSFollower.java | 98 ++++++++++++++++++++ 3 files changed, 130 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/a001e445/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java index f7412a3..a9afb15 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java +++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java @@ -44,7 +44,7 @@ public class ServiceConstants { public static final String SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC = "sentry.hdfs.sync.metastore.cache.max-tables-per-rpc"; public static final int SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC_DEFAULT = 100; static final String SENTRY_SERVICE_FULL_UPDATE_SIGNAL = "sentry.hdfs.sync.full-update-signal"; - static final String SENTRY_SERVICE_FULL_UPDATE_PUBSUB = "sentry.hdfs.sync.full-update-pubsub"; + public static final String SENTRY_SERVICE_FULL_UPDATE_PUBSUB = "sentry.hdfs.sync.full-update-pubsub"; public static final String SENTRY_HDFS_INTEGRATION_PATH_PREFIXES = "sentry.hdfs.integration.path.prefixes"; public static final String[] SENTRY_HDFS_INTEGRATION_PATH_PREFIXES_DEFAULT = http://git-wip-us.apache.org/repos/asf/sentry/blob/a001e445/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java index 0861132..c4cc918 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java @@ -18,6 +18,8 @@ package org.apache.sentry.service.thrift; +import org.apache.sentry.core.common.utils.PubSub; +import org.apache.sentry.hdfs.ServiceConstants.ServerConfig; import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME; import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME_DEPRECATED; @@ -26,6 +28,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.Collection; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jdo.JDODataStoreException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.api.NotificationEvent; @@ -41,10 +44,12 @@ import org.slf4j.LoggerFactory; * update permissions stored in Sentry using SentryStore and also update the < obj,path > state * stored for HDFS-Sentry sync. */ -public class HMSFollower implements Runnable, AutoCloseable { +public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber { private static final Logger LOGGER = LoggerFactory.getLogger(HMSFollower.class); + private static final String FULL_UPDATE_TRIGGER = "FULL UPDATE TRIGGER: "; private static boolean connectedToHms = false; + private SentryHMSClient client; private final Configuration authzConf; private final SentryStore sentryStore; @@ -52,6 +57,7 @@ public class HMSFollower implements Runnable, AutoCloseable { private boolean readyToServe; private final HiveNotificationFetcher notificationFetcher; private final boolean hdfsSyncEnabled; + private final AtomicBoolean fullUpdateHMS = new AtomicBoolean(false); private final LeaderStatusMonitor leaderMonitor; @@ -99,6 +105,13 @@ public class HMSFollower implements Runnable, AutoCloseable { client = new SentryHMSClient(authzConf, hiveConnectionFactory); hdfsSyncEnabled = SentryServiceUtil.isHDFSSyncEnabledNoCache(authzConf); // no cache to test different settings for hdfs sync notificationFetcher = new HiveNotificationFetcher(sentryStore, hiveConnectionFactory); + + // subscribe to full update notification + if (conf.getBoolean(ServerConfig.SENTRY_SERVICE_FULL_UPDATE_PUBSUB, false)) { + LOGGER.info(FULL_UPDATE_TRIGGER + "subscribing to topic " + PubSub.Topic.HDFS_SYNC_HMS.getName()); + PubSub.getInstance().subscribe(PubSub.Topic.HDFS_SYNC_HMS, this); + } + } @VisibleForTesting @@ -243,6 +256,13 @@ public class HMSFollower implements Runnable, AutoCloseable { return true; } + // check if forced full update is required, reset update flag to false + // to only do it once per forced full update request. + if (fullUpdateHMS.compareAndSet(true, false)) { + LOGGER.info(FULL_UPDATE_TRIGGER + "initiating full HMS snapshot request"); + return true; + } + return false; } @@ -435,4 +455,14 @@ public class HMSFollower implements Runnable, AutoCloseable { counterWait.update(eventId); } + + /** + * PubSub.Subscriber callback API + */ + @Override + public void onMessage(PubSub.Topic topic, String message) { + Preconditions.checkArgument(topic == PubSub.Topic.HDFS_SYNC_HMS, "Unexpected topic %s instead of %s", topic, PubSub.Topic.HDFS_SYNC_HMS); + LOGGER.info(FULL_UPDATE_TRIGGER + "Received [{}, {}] notification", topic, message); + fullUpdateHMS.set(true); + } } http://git-wip-us.apache.org/repos/asf/sentry/blob/a001e445/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java index 7d64375..bbcf093 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java @@ -49,10 +49,13 @@ import org.apache.hive.hcatalog.messaging.HCatEventMessage.EventType; import org.apache.sentry.binding.hive.conf.HiveAuthzConf; import org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars; import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageFactory; +import org.apache.sentry.core.common.utils.PubSub; import org.apache.sentry.hdfs.UniquePathsUpdate; import org.apache.sentry.provider.db.service.persistent.PathsImage; import org.apache.sentry.provider.db.service.persistent.SentryStore; import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable; +import static org.apache.sentry.hdfs.ServiceConstants.ServerConfig.SENTRY_SERVICE_FULL_UPDATE_PUBSUB; + import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; @@ -79,6 +82,7 @@ public class TestHMSFollower { hiveConnectionFactory = new HiveSimpleConnectionFactory(configuration, new HiveConf()); hiveConnectionFactory.init(); configuration.set("sentry.hive.sync.create", "true"); + configuration.set(SENTRY_SERVICE_FULL_UPDATE_PUBSUB, "true"); // enable HDFS sync, so perm and path changes will be saved into DB configuration.set(ServiceConstants.ServerConfig.PROCESSOR_FACTORIES, "org.apache.sentry.hdfs.SentryHDFSServiceProcessorFactory"); @@ -141,6 +145,100 @@ public class TestHMSFollower { } @Test + public void testPersistAFullSnapshotWhenFullSnapshotTrigger() throws Exception { + /* + * TEST CASE + * + * Simulates (by using mocks) the following: + * + * HMS client always returns the paths image with the eventId == 1. + * + * On the 1st run: Sentry has not processed any notifications, so this + * should trigger a new full HMS snapshot request with the eventId = 1 + * + * On the 2nd run: Sentry store returns the latest eventId == 1, + * which matches the eventId returned by HMS client. Because of the match, + * no full update is triggered. + * + * On the 3d run: before the run, full update flag in HMSFollower is set via + * publish-subscribe mechanism. + * Sentry store still returns the latest eventId == 1, + * which matches the eventId returned by HMS client. Because of the match, + * no full update should be triggered. However, because of the trigger set, + * a new full HMS snapshot will be triggered. + * + * On the 4th run: Sentry store returns the latest eventId == 1, + * which matches the eventId returned by HMS client. Because of the match, + * no full update is triggered. This is to check that forced trigger set + * for run 3 only works once. + * + */ + + final long SENTRY_PROCESSED_EVENT_ID = SentryStore.EMPTY_NOTIFICATION_ID; + final long HMS_PROCESSED_EVENT_ID = 1L; + + // Mock that returns a full snapshot + Map<String, Collection<String>> snapshotObjects = new HashMap<>(); + snapshotObjects.put("db", Sets.newHashSet("/db")); + snapshotObjects.put("db.table", Sets.newHashSet("/db/table")); + PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1); + + // Mock that returns the current HMS notification ID + when(hmsClientMock.getCurrentNotificationEventId()) + .thenReturn(new CurrentNotificationEventId(fullSnapshot.getId())); + + SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class); + when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot); + + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, + hmsConnectionMock, hiveInstance); + hmsFollower.setSentryHmsClient(sentryHmsClient); + + // 1st run should get a full snapshot because AuthzPathsMapping is empty + when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); + when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(true); + when(sentryStore.isHmsNotificationEmpty()).thenReturn(true); + hmsFollower.run(); + verify(sentryStore, times(1)).persistFullPathsImage( + fullSnapshot.getPathImage(), fullSnapshot.getId()); + // Saving notificationID is in the same transaction of saving full snapshot + verify(sentryStore, times(0)).persistLastProcessedNotificationID(fullSnapshot.getId()); + + reset(sentryStore); + + // 2nd run should not get a snapshot because is already processed + when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId()); + when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); + hmsFollower.run(); + verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong()); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); + + reset(sentryStore); + + // 3d run should not get a snapshot because is already processed, + // but because of full update trigger it will, as in the first run + PubSub.getInstance().publish(PubSub.Topic.HDFS_SYNC_HMS, "message"); + + when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId()); + when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); + hmsFollower.run(); + verify(sentryStore, times(1)).persistFullPathsImage( + fullSnapshot.getPathImage(), fullSnapshot.getId()); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(fullSnapshot.getId()); + + reset(sentryStore); + + // 4th run should not get a snapshot because is already processed and publish-subscribe + // trigger is only supposed to work once. This is exactly as 2nd run. + when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId()); + when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); + hmsFollower.run(); + verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong()); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); + + } + + @Test public void testPersistAFullSnapshotWhenLastHmsNotificationIsLowerThanLastProcessed() throws Exception { /*
