Repository: sentry Updated Branches: refs/heads/master 0235d5959 -> 5b8cec9cd
SENTRY-1712: Add trigger mechanism for Sentry to push full path snapshot to Name Node. (Vadim Spector, reviewed by Sergio Pena) Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/3a97427d Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/3a97427d Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/3a97427d Branch: refs/heads/master Commit: 3a97427db410473189e8c0fe05cb2d6815978a56 Parents: 64476a7 Author: Vadim Spector <[email protected]> Authored: Fri Nov 3 10:06:56 2017 -0700 Committer: Vadim Spector <[email protected]> Committed: Fri Nov 3 10:06:56 2017 -0700 ---------------------------------------------------------------------- .../apache/sentry/hdfs/ServiceConstants.java | 1 + .../org/apache/sentry/hdfs/SentryPlugin.java | 57 +++++++++++++++----- .../hdfs/TestSentryHDFSServiceProcessor.java | 46 +++++++++++++++- 3 files changed, 90 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/3a97427d/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java index d65207f..f7412a3 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java +++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java @@ -44,6 +44,7 @@ public class ServiceConstants { public static final String SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC = "sentry.hdfs.sync.metastore.cache.max-tables-per-rpc"; public static final int SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC_DEFAULT = 100; static final String SENTRY_SERVICE_FULL_UPDATE_SIGNAL = "sentry.hdfs.sync.full-update-signal"; + static final String SENTRY_SERVICE_FULL_UPDATE_PUBSUB = "sentry.hdfs.sync.full-update-pubsub"; public static final String SENTRY_HDFS_INTEGRATION_PATH_PREFIXES = "sentry.hdfs.integration.path.prefixes"; public static final String[] SENTRY_HDFS_INTEGRATION_PATH_PREFIXES_DEFAULT = http://git-wip-us.apache.org/repos/asf/sentry/blob/3a97427d/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java index ee528be..5890948 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.sentry.core.common.exception.SentryInvalidInputException; +import org.apache.sentry.core.common.utils.PubSub; import org.apache.sentry.core.common.utils.SigUtils; import org.apache.sentry.hdfs.ServiceConstants.ServerConfig; import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges; @@ -41,6 +42,7 @@ import org.apache.sentry.provider.db.service.thrift.TRenamePrivilegesRequest; import org.apache.sentry.provider.db.service.thrift.TSentryGroup; import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege; import org.apache.sentry.service.thrift.HMSFollower; +import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,20 +74,29 @@ import static org.apache.sentry.hdfs.service.thrift.sentry_hdfs_serviceConstants * The image number may be used to identify whether new full updates are persisted and need * to be retrieved instead of delta updates. * <p> - * SentryPlugin also implements signal-triggered mechanism of full path - * updates from HMS to Sentry and from Sentry to NameNode, to address - * mission-critical out-of-sync situations that may be encountered in the field. + * SentryPlugin implements mechanism of triggering full path updates from Sentry to NameNode, + * to address mission-critical out-of-sync situations that may be encountered in the field. * Those out-of-sync situations may not be detectable via the exsiting sequence * numbers mechanism (most likely due to the implementation bugs). * <p> - * To facilitate signal-triggered full update from Sentry to NameNode, - * the boolean variables 'fullUpdateNN' is used to ensure that Sentry sends full - * update to NameNode, and does so only once per signal. + * To trigger full update from Sentry to NameNode, the boolean variable 'fullUpdateNN' is + * used to ensure that Sentry sends full update to NameNode, and does so only once per + * triggering event. * </ol> * The details: * <ol> * <li> - * Upon receiving a signal, fullUpdateNN is set to true. + * There are two mechanisms to trigger full update: by signal (deprecated) and via WebUI. + * Both mechanisms are configurable and turned OFF by default. + * <li> + * To use signal mechanism, SentryPlugin uses SigUtils to subscribe to specific + * (configurable) signal that should be delivered to JVM running Sentry server. + * <li> + * To use the WebUI mechanism, SentryPlugin uses PubSub which provides publish-subscribe + * framework. SentryPlugin subscribed to PubSub.Topic.HDFS_SYNC_NN topic. + * + * Upon receiving a signal, or upon been notified via pub-sub mechanism, fullUpdateNN + * is set to true. * <li> * When NameNode calls getAllPathsUpdatesFrom() asking for partial update, * Sentry checks if both fullUpdateNN == true. If yes, it sends full update back @@ -93,9 +104,10 @@ import static org.apache.sentry.hdfs.service.thrift.sentry_hdfs_serviceConstants * </ol> */ -public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListener { +public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListener, PubSub.Subscriber { private static final Logger LOGGER = LoggerFactory.getLogger(SentryPlugin.class); + private static final String FULL_UPDATE_TRIGGER = "FULL UPDATE TRIGGER: "; private final AtomicBoolean fullUpdateNN = new AtomicBoolean(false); public static volatile SentryPlugin instance; @@ -131,6 +143,12 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen } } } + + // subscribe to full update notification + if (conf.getBoolean(ServerConfig.SENTRY_SERVICE_FULL_UPDATE_PUBSUB, false)) { + LOGGER.info(FULL_UPDATE_TRIGGER + "subscribing to topic " + PubSub.Topic.HDFS_SYNC_NN.getName()); + PubSub.getInstance().subscribe(PubSub.Topic.HDFS_SYNC_NN, this); + } } /** @@ -148,7 +166,7 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen * Sentry is in the middle of signal-triggered full update. * It already got a full update from HMS */ - LOGGER.info("SIGNAL HANDLING: sending full PATH update to NameNode"); + LOGGER.info(FULL_UPDATE_TRIGGER + "sending full PATH update to NameNode"); fullUpdateNN.set(false); // don't do full NN update till the next signal List<PathsUpdate> updates = pathsUpdater.getAllUpdatesFrom(SEQUENCE_NUMBER_UPDATE_UNINITIALIZED, IMAGE_NUMBER_UPDATE_UNINITIALIZED); @@ -167,15 +185,15 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen if (updates != null) { if (!updates.isEmpty()) { if (updates.get(0).hasFullImage()) { - LOGGER.info("SIGNAL HANDLING: Confirmed full PATH update to NameNode for pathSeqNum {} and pathImgNum {}", pathSeqNum, pathImgNum); + LOGGER.info(FULL_UPDATE_TRIGGER + "Confirmed full PATH update to NameNode for pathSeqNum {} and pathImgNum {}", pathSeqNum, pathImgNum); } else { - LOGGER.warn("SIGNAL HANDLING: Sending partial instead of full PATH update to NameNode for pathSeqNum {} and pathImgNum {} (???)", pathSeqNum, pathImgNum); + LOGGER.warn(FULL_UPDATE_TRIGGER + "Sending partial instead of full PATH update to NameNode for pathSeqNum {} and pathImgNum {} (???)", pathSeqNum, pathImgNum); } } else { - LOGGER.warn("SIGNAL HANDLING: Sending empty instead of full PATH update to NameNode for pathSeqNum {} and pathImgNum {} (???)", pathSeqNum, pathImgNum); + LOGGER.warn(FULL_UPDATE_TRIGGER + "Sending empty instead of full PATH update to NameNode for pathSeqNum {} and pathImgNum {} (???)", pathSeqNum, pathImgNum); } } else { - LOGGER.warn("SIGNAL HANDLING: returned NULL instead of full PATH update to NameNode for pathSeqNum {} and pathImgNum {} (???)", pathSeqNum, pathImgNum); + LOGGER.warn(FULL_UPDATE_TRIGGER + "returned NULL instead of full PATH update to NameNode for pathSeqNum {} and pathImgNum {} (???)", pathSeqNum, pathImgNum); } return updates; } @@ -334,12 +352,25 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen return update; } + /** + * SigUtils.SigListener callback API + */ @Override public void onSignal(final String sigName) { LOGGER.info("SIGNAL HANDLING: Received signal " + sigName + ", triggering full update"); fullUpdateNN.set(true); } + /** + * PubSub.Subscriber callback API + */ + @Override + public void onMessage(PubSub.Topic topic, String message) { + Preconditions.checkArgument(topic == PubSub.Topic.HDFS_SYNC_NN, "Unexpected topic %s instead of %s", topic, PubSub.Topic.HDFS_SYNC_NN); + LOGGER.info(FULL_UPDATE_TRIGGER + "Received [{}, {}] notification", topic, message); + fullUpdateNN.set(true); + } + private String getAuthzObj(TSentryPrivilege privilege) { String authzObj = null; if (!SentryStore.isNULL(privilege.getDbName())) { http://git-wip-us.apache.org/repos/asf/sentry/blob/3a97427d/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestSentryHDFSServiceProcessor.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestSentryHDFSServiceProcessor.java b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestSentryHDFSServiceProcessor.java index 7a1b8e0..f09d1b2 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestSentryHDFSServiceProcessor.java +++ b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestSentryHDFSServiceProcessor.java @@ -18,6 +18,8 @@ package org.apache.sentry.hdfs; import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.core.common.utils.PubSub; +import org.apache.sentry.hdfs.ServiceConstants.ServerConfig; import org.apache.sentry.hdfs.service.thrift.TAuthzUpdateRequest; import org.apache.sentry.hdfs.service.thrift.TAuthzUpdateResponse; import org.apache.sentry.provider.db.SentryPolicyStorePlugin; @@ -45,7 +47,10 @@ public class TestSentryHDFSServiceProcessor { public static void setUp() throws SentryPolicyStorePlugin.SentryPluginException { serviceProcessor = new SentryHDFSServiceProcessor(); sentryStoreMock = Mockito.mock(SentryStore.class); - new SentryPlugin().initialize(new Configuration(), sentryStoreMock); + Configuration conf = new Configuration(); + // enable full update triger via pub-sub mechanism + conf.set(ServerConfig.SENTRY_SERVICE_FULL_UPDATE_PUBSUB, "true"); + new SentryPlugin().initialize(conf, sentryStoreMock); } @Test @@ -131,6 +136,45 @@ public class TestSentryHDFSServiceProcessor { assertFalse(sentryUpdates.getAuthzPermUpdate().get(0).isHasfullImage()); } + /** + * Verify that publish-subscribe mechanism works for triggering full paths updates + */ + @Test + public void testRequestSyncUpdatesWhenPubSubNotifyReturnsFullPathsUpdate() throws Exception { + // Configure SentryStore mock to return small sequence numbers + Mockito.when(sentryStoreMock.getLastProcessedImageID()) + .thenReturn(1L); + Mockito.when(sentryStoreMock.getLastProcessedPathChangeID()) + .thenReturn(2L); + Mockito.when(sentryStoreMock.getLastProcessedPermChangeID()) + .thenReturn(2L); + // Also, configure SentryStore mock return full paths update once; + // throw an exception afterwards. + Mockito.when(sentryStoreMock.retrieveFullPathsImageUpdate(Mockito.any())) + .thenReturn(new PathsUpdate(8, 5, true)) + .thenThrow(new RuntimeException("Not supposed to ask for full path update first time")); + + // now ask for larger sequence numbers - supposed to return nothing + TAuthzUpdateRequest updateRequest = new TAuthzUpdateRequest(3, 3, 1); + TAuthzUpdateResponse sentryUpdates= serviceProcessor.get_authz_updates(updateRequest); + // no permissions updates + assertEquals(0, sentryUpdates.getAuthzPermUpdateSize()); + // no paths updates + assertEquals(0, sentryUpdates.getAuthzPathUpdateSize()); + + // Now set full update trigger ... + PubSub.getInstance().publish(PubSub.Topic.HDFS_SYNC_NN, "test message"); + // ... then repeat exactly the same update call + sentryUpdates= serviceProcessor.get_authz_updates(updateRequest); + // ... still no permissions updates returned + assertEquals(0, sentryUpdates.getAuthzPermUpdateSize()); + // ... but now we are getting full paths update, as intended by trigger logic + assertEquals(1, sentryUpdates.getAuthzPathUpdateSize()); + assertEquals(5, sentryUpdates.getAuthzPathUpdate().get(0).getImgNum()); + assertEquals(8, sentryUpdates.getAuthzPathUpdate().get(0).getSeqNum()); + assertTrue(sentryUpdates.getAuthzPathUpdate().get(0).isHasFullImage()); + } + @Test public void testRequestSyncUpdatesWhenNoUpdatesExistReturnsEmptyResults() throws Exception { Mockito.when(sentryStoreMock.getLastProcessedImageID())
