SENTRY-1756: Passive nodes should still follow latest notification ID (Sergio Pena, reviewed by Alex Kolbasov)
Change-Id: I4ddd40ad6c6c0adc3f7a5684380f8e184678d898 Reviewed-on: http://gerrit.sjc.cloudera.com:8080/22485 Tested-by: Jenkins User Reviewed-by: Alexander Kolbasov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/4155eace Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/4155eace Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/4155eace Branch: refs/for/cdh5-1.5.1_ha Commit: 4155eace9cb8f84ca07030130a6e0351c2877a0b Parents: d8e2282 Author: Alexander Kolbasov <[email protected]> Authored: Wed May 10 18:02:31 2017 -0700 Committer: Alexander Kolbasov <[email protected]> Committed: Wed May 10 20:35:30 2017 -0700 ---------------------------------------------------------------------- .../sentry/service/thrift/HMSFollower.java | 45 +++++++++++++++++--- 1 file changed, 39 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/4155eace/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java index 29f6957..d2bc5af 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java @@ -210,6 +210,13 @@ public class HMSFollower implements Runnable, AutoCloseable { @Override public void run() { + // Wake any clients connected to this service waiting for HMS already processed notifications. + try { + wakeUpWaitingClientsForSync(getLastProcessedNotificationID()); + } catch (Exception e) { + LOGGER.error("Couldn't wake up HMS waiters because an error attempting to get the latest notification ID.", e); + } + // Only the leader should listen to HMS updates if ((leaderMonitor != null) && !leaderMonitor.isLeader()) { // Close any outstanding connections to HMS @@ -217,6 +224,34 @@ public class HMSFollower implements Runnable, AutoCloseable { return; } + processHiveMetastoreUpdates(); + } + + /** + * Wakes up HMS waiters waiting for a specific event notification. + * + * @param eventID + */ + private void wakeUpWaitingClientsForSync(long eventID) { + CounterWait counterWait = sentryStore.getCounterWait(); + + // Wake up any HMS waiters that are waiting for this ID. + // counterWait should never be null, but tests mock SentryStore and a mocked one + // doesn't have it. + if (counterWait != null) { + counterWait.update(eventID); + } + } + + /** + * Processes new Hive Metastore notifications. + * + * If no notifications are processed yet, then it does a full initial snapshot of the Hive Metastore + * followed by new notifications updates that could have happened after it. + * + * Clients connections waiting for an event notification will be woken up afterwards. + */ + private void processHiveMetastoreUpdates() { if (client == null) { try { client = getMetaStoreClient(authzConf); @@ -273,6 +308,9 @@ public class HMSFollower implements Runnable, AutoCloseable { needHiveSnapshot = false; currentEventID = eventIDAfter.getEventId(); sentryStore.persistFullPathsImage(pathsFullSnapshot); + + // Wake up any HMS waiters that could have been put on hold before getting the eventIDBefore value. + wakeUpWaitingClientsForSync(currentEventID); } // HIVE-15761: Currently getNextNotification API may return an empty @@ -374,7 +412,6 @@ public class HMSFollower implements Runnable, AutoCloseable { */ void processNotificationEvents(List<NotificationEvent> events) throws Exception { SentryJSONMessageDeserializer deserializer = new SentryJSONMessageDeserializer(); - final CounterWait counterWait = sentryStore.getCounterWait(); for (NotificationEvent event : events) { String dbName; @@ -549,11 +586,7 @@ public class HMSFollower implements Runnable, AutoCloseable { } currentEventID = event.getEventId(); // Wake up any HMS waiters that are waiting for this ID. - // counterWait should never be null, but tests mock SentryStore and a mocked one - // doesn't have it. - if (counterWait != null) { - counterWait.update(currentEventID); - } + wakeUpWaitingClientsForSync(currentEventID); } }
