Repository: sentry Updated Branches: refs/heads/sentry-ha-redesign 0b8301f28 -> e96151ad3
SENTRY-1463: Ensure HMS point-in-time snapshot consistency(Hao Hao, Reviewed by: Sravya Tirukkovalur, Li Li) Change-Id: I3aad74e572fed5b0ced7104457cd2441d0a7b754 Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/e96151ad Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/e96151ad Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/e96151ad Branch: refs/heads/sentry-ha-redesign Commit: e96151ad36f92c1c54427906f6c5a6484b60a9cd Parents: 0b8301f Author: hahao <[email protected]> Authored: Mon Oct 24 16:21:51 2016 -0700 Committer: hahao <[email protected]> Committed: Mon Oct 24 20:42:06 2016 -0700 ---------------------------------------------------------------------- .../sentry/service/thrift/HMSFollower.java | 88 +++++++++++++------- .../sentry/service/thrift/SentryService.java | 4 +- 2 files changed, 61 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/e96151ad/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java index 8989292..0baa1a2 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; @@ -71,17 +72,17 @@ public class HMSFollower implements Runnable { private String hiveInstance; final static int maxRetriesForLogin = 3; final static int maxRetriesForConnection = 3; + private volatile UpdateableAuthzPaths authzPaths; - private AtomicBoolean fullUpdateComplete; + private boolean needHiveSnapshot = true; - HMSFollower(Configuration conf, AtomicBoolean fullUpdateComplete) throws SentryNoSuchObjectException, + HMSFollower(Configuration conf) throws SentryNoSuchObjectException, SentryAccessDeniedException, SentrySiteConfigurationException, IOException { //TODO: Handle any possible exceptions or throw specific exceptions LOGGER.info("HMSFollower is being initialized"); authzConf = conf; sentryStore = new SentryStore(authzConf); //TODO: Initialize currentEventID from Sentry db currentEventID = 0; - this.fullUpdateComplete = fullUpdateComplete; } @VisibleForTesting @@ -183,7 +184,7 @@ public class HMSFollower implements Runnable { } public void run() { - if( client == null ) { + if (client == null) { try { client = getMetaStoreClient(authzConf); if (client == null) { @@ -194,16 +195,55 @@ public class HMSFollower implements Runnable { LOGGER.info("HMSFollower of Sentry successfully connected to HMS"); } } catch (Exception e) { - + LOGGER.error("HMSFollower cannot connect to HMS!!"); + return; } } - if (needFullUpdate()) { - // TODO: read currentEventID from Sentry DB - // This guarantee events before failover but did not applied can be fetch later. - fetchFullUpdate(); - } try { + if (isNeedHiveSnapshot()) { + // TODO: expose time used for full update in the metrics + + // To ensure point-in-time snapshot consistency, need to make sure + // there were no HMS updates while retrieving the snapshot. + // In detail the logic is: + // + // 1. Read current HMS notification ID_initial + // 2. Read HMS metadata state + // 3. Read current notification ID_new + // 4. If ID_initial != ID_new then the attempts for retrieving full HMS snapshot + // will be dropped. A new attempts will be made after 500 milliseconds when + // HMSFollower run again. + + CurrentNotificationEventId eventIDBefore = null; + CurrentNotificationEventId eventIDAfter = null; + + try { + eventIDBefore = client.getCurrentNotificationEventId(); + LOGGER.info(String.format("Before fetching hive full snapshot, Current NotificationID = %s.", + eventIDBefore)); + fetchFullUpdate(); + eventIDAfter = client.getCurrentNotificationEventId(); + LOGGER.info(String.format("After fetching hive full snapshot, Current NotificationID = %s.", + eventIDAfter)); + } catch (Exception ex) { + LOGGER.error("#### Encountered failure during fetching one hive full snapshot !! Current NotificationID = " + + eventIDAfter.toString(), ex); + return; + } + + if (!eventIDBefore.equals(eventIDAfter)) { + LOGGER.error("#### Fail to get a point-in-time hive full snapshot !! Current NotificationID = " + + eventIDAfter.toString()); + return; + } + + LOGGER.info(String.format("Successfully fetched hive full snapshot, Current NotificationID = %s.", + eventIDAfter)); + needHiveSnapshot = false; + currentEventID = eventIDAfter.getEventId(); + } + NotificationEventResponse response = client.getNextNotification(currentEventID, Integer.MAX_VALUE, null); if (response.isSetEvents()) { LOGGER.info(String.format("CurrentEventID = %s. Processing %s events", @@ -219,20 +259,16 @@ public class HMSFollower implements Runnable { } /** - * Block the sentry service until it starts up, signal main thread - * the full update fetch process is done. + * Retrieve HMS full snapshot. */ - private void fetchFullUpdate() { - fullUpdateComplete.getAndSet(false); - + private void fetchFullUpdate() throws Exception { FullUpdateInitializer updateInitializer = null; + try { updateInitializer = new FullUpdateInitializer(client, authzConf); HMSFollower.this.authzPaths = updateInitializer.createInitialUpdate(); // TODO: notify HDFS plugin LOGGER.info("#### Hive full update initialization complete !!"); - } catch (Exception e) { - LOGGER.error("#### Could not create hive full update !!", e); } finally { if (updateInitializer != null) { try { @@ -241,20 +277,16 @@ public class HMSFollower implements Runnable { LOGGER.info("#### Exception while closing updateInitializer !!", e); } } - - fullUpdateComplete.getAndSet(true); } } - private boolean needFullUpdate() { - // Currently fullUpdateComplete is indicator that server is starting up - // and in request of a full update. - // TODO: set fullUpdateComplete based on notification id stored in SentryDB. - if (!fullUpdateComplete.get()) { - return true; - } else { - return false; - } + private boolean isNeedHiveSnapshot() { + // An indicator that in request of a full hive update. + + // TODO: Will need to get Hive snapshot if the Notification ID + // we are requesting has been rolled over in the NotificationLog + // table of Hive + return needHiveSnapshot; } private boolean syncWithPolicyStore(HiveAuthzConf.AuthzConfVars syncConfVar) { http://git-wip-us.apache.org/repos/asf/sentry/blob/e96151ad/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java index 9e9358b..bc1fe1f 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java @@ -29,7 +29,6 @@ import java.util.ArrayList; import java.util.EventListener; import java.util.List; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; import javax.security.auth.Subject; @@ -95,7 +94,6 @@ public class SentryService implements Callable { private long maxMessageSize; private final boolean isHA; private final Activator act; - private AtomicBoolean fullUpdateComplete = new AtomicBoolean(false); SentryMetrics sentryMetrics; public SentryService(Configuration conf) throws Exception { @@ -159,7 +157,7 @@ public class SentryService implements Callable { //TODO: Enable only if Hive is using Sentry? try { hmsFollowerExecutor = Executors.newScheduledThreadPool(1); - hmsFollowerExecutor.scheduleAtFixedRate(new HMSFollower(conf, fullUpdateComplete), 60000, 500, TimeUnit.MILLISECONDS); + hmsFollowerExecutor.scheduleAtFixedRate(new HMSFollower(conf), 60000, 500, TimeUnit.MILLISECONDS); }catch(Exception e) { //TODO: Handle LOGGER.error("Could not start HMSFollower");
