SENTRY-1760: HMSFollower should detect when a full snapshot from HMS is required (Sergio Pena, reviewed by Alex Kolbasov, Brian Towels)
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/ca315fe9 Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/ca315fe9 Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/ca315fe9 Branch: refs/heads/master Commit: ca315fe971d32a9ca87183746c9dd1744a76e8d4 Parents: eed5383 Author: Alexander Kolbasov <[email protected]> Authored: Thu Jul 20 19:37:28 2017 +0200 Committer: Alexander Kolbasov <[email protected]> Committed: Thu Jul 20 19:37:28 2017 +0200 ---------------------------------------------------------------------- .../sentry/service/thrift/HMSFollower.java | 118 +++++++++++++-- .../sentry/service/thrift/SentryHMSClient.java | 38 +++-- .../sentry/service/thrift/TestHMSFollower.java | 146 +++++++++++++++++++ 3 files changed, 278 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/ca315fe9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java index 083ee4c..547a61f 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java @@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting; import java.net.SocketException; import java.util.Collection; +import java.util.List; import javax.jdo.JDODataStoreException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -44,7 +45,7 @@ public class HMSFollower implements Runnable, AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(HMSFollower.class); private static boolean connectedToHms = false; - private final SentryHMSClient client; + private SentryHMSClient client; private final Configuration authzConf; private final SentryStore sentryStore; private final NotificationProcessor notificationProcessor; @@ -91,6 +92,11 @@ public class HMSFollower implements Runnable, AutoCloseable { return connectedToHms; } + @VisibleForTesting + void setSentryHmsClient(SentryHMSClient client) { + this.client = client; + } + @Override public void close() { if (client != null) { @@ -117,7 +123,7 @@ public class HMSFollower implements Runnable, AutoCloseable { // Wake any clients connected to this service waiting for HMS already processed notifications. wakeUpWaitingClientsForSync(lastProcessedNotificationId); // Only the leader should listen to HMS updates - if ((leaderMonitor != null) && !leaderMonitor.isLeader()) { + if (!isLeader()) { // Close any outstanding connections to HMS close(); return; @@ -125,6 +131,10 @@ public class HMSFollower implements Runnable, AutoCloseable { syncupWithHms(lastProcessedNotificationId); } + private boolean isLeader() { + return (leaderMonitor == null) || leaderMonitor.isLeader(); + } + /** * Processes new Hive Metastore notifications. * @@ -145,18 +155,23 @@ public class HMSFollower implements Runnable, AutoCloseable { } try { - long lastProcessedNotificationId = notificationId; - // Create a full HMS snapshot if there is none - // Decision of taking full snapshot is based on AuthzPathsMapping information persisted - // in the sentry persistent store. If AuthzPathsMapping is empty, snapshot is needed. - if (sentryStore.isAuthzPathsMappingEmpty()) { - lastProcessedNotificationId = createFullSnapshot(); - if (lastProcessedNotificationId == SentryStore.EMPTY_NOTIFICATION_ID) { - return; - } + /* Before getting notifications, it checks if a full HMS snapshot is required. */ + if (isFullSnapshotRequired(notificationId)) { + createFullSnapshot(); + return; + } + + Collection<NotificationEvent> notifications = client.getNotifications(notificationId); + + // After getting notifications, it checks if the HMS did some clean-up and notifications + // are out-of-sync with Sentry. + if (areNotificationsOutOfSync(notifications, notificationId)) { + createFullSnapshot(); + return; } - // Get the new notification from HMS and process them. - processNotifications(client.getNotifications(lastProcessedNotificationId)); + + // Continue with processing new notifications if no snapshots are done. + processNotifications(notifications); } catch (TException e) { // If the underlying exception is around socket exception, // it is better to retry connection to HMS @@ -175,6 +190,75 @@ public class HMSFollower implements Runnable, AutoCloseable { } /** + * Checks if a new full HMS snapshot request is needed by checking if: + * <ul> + * <li>No snapshots has been persisted yet.</li> + * <li>The current notification Id on the HMS is less than the + * latest processed by Sentry.</li> + * </ul> + * + * @param latestSentryNotificationId The notification Id to check against the HMS + * @return True if a full snapshot is required; False otherwise. + * @throws Exception If an error occurs while checking the SentryStore or the HMS client. + */ + private boolean isFullSnapshotRequired(long latestSentryNotificationId) throws Exception { + if (sentryStore.isAuthzPathsMappingEmpty()) { + return true; + } + + long currentHmsNotificationId = client.getCurrentNotificationId(); + if (currentHmsNotificationId < latestSentryNotificationId) { + LOGGER.info("The latest notification ID on HMS is less than the latest notification ID " + + "processed by Sentry. Need to request a full HMS snapshot."); + return true; + } + + return false; + } + + /** + * Checks if the HMS and Sentry processed notifications are out-of-sync. + * This could happen because the HMS did some clean-up of old notifications + * and Sentry was not requesting notifications during that time. + * + * @param events All new notifications to check for an out-of-sync. + * @param latestProcessedId The latest notification processed by Sentry to check against the + * list of notifications events. + * @return True if an out-of-sync is found; False otherwise. + */ + private boolean areNotificationsOutOfSync(Collection<NotificationEvent> events, + long latestProcessedId) { + if (events.isEmpty()) { + return false; + } + + List<NotificationEvent> eventList = (List<NotificationEvent>) events; + long firstNotificationId = eventList.get(0).getEventId(); + long lastNotificationId = eventList.get(eventList.size() - 1).getEventId(); + + /* If the next expected notification is not available, then an out-of-sync might + * have happened due to the following issue: + * + * - HDFS sync was disabled or Sentry was shutdown for a time period longer than + * the HMS notification clean-up thread causing old notifications to be deleted. + */ + if ((latestProcessedId + 1) != firstNotificationId) { + LOGGER.info("Current HMS notifications are out-of-sync with latest Sentry processed" + + "notifications. Need to request a full HMS snapshot."); + return true; + } + + long expectedSize = lastNotificationId - latestProcessedId; + if (expectedSize < eventList.size()) { + LOGGER.info("The HMS notifications fetched has some gaps in the # of events received. These" + + "should not cause an out-of-sync issue. (expected = {}, fetched = {})", + expectedSize, eventList.size()); + } + + return false; + } + + /** * Request for full snapshot and persists it if there is no snapshot available in the * sentry store. Also, wakes-up any waiting clients. * @@ -187,6 +271,12 @@ public class HMSFollower implements Runnable, AutoCloseable { if (snapshotInfo.getPathImage().isEmpty()) { return snapshotInfo.getId(); } + + // Check we're still the leader before persisting the new snapshot + if (!isLeader()) { + return SentryStore.EMPTY_NOTIFICATION_ID; + } + try { LOGGER.debug("Persisting HMS path full snapshot"); sentryStore.persistFullPathsImage(snapshotInfo.getPathImage()); @@ -220,7 +310,7 @@ public class HMSFollower implements Runnable, AutoCloseable { isNotificationProcessed = false; try { // Only the leader should process the notifications - if ((leaderMonitor != null) && !leaderMonitor.isLeader()) { + if (!isLeader()) { return; } isNotificationProcessed = notificationProcessor.processNotificationEvent(event); http://git-wip-us.apache.org/repos/asf/sentry/blob/ca315fe9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java index 05518e8..4f76a94 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java @@ -54,7 +54,7 @@ import org.slf4j.LoggerFactory; * <p>Abstracts communication with HMS and exposes APi's to connect/disconnect to HMS and to * request HMS snapshots and also for new notifications. */ -final class SentryHMSClient implements AutoCloseable { +class SentryHMSClient implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(SentryHMSClient.class); private final Configuration conf; @@ -213,27 +213,27 @@ final class SentryHMSClient implements AutoCloseable { LOGGER.error("Client is not connected to HMS"); return Collections.emptyList(); } + LOGGER.debug("Checking for notifications beyond {}", notificationId); - // HIVE-15761: Currently getNextNotification API may return an empty - // NotificationEventResponse causing TProtocolException. - // Workaround: Only processes the notification events newer than the last updated one. + + // A bug HIVE-15761 (fixed on Hive 2.4.0) should allow requesting notifications using + // an unprocessed notification ID without causing an exception. For now, we just + // leave this workaround and log debug messages. CurrentNotificationEventId eventId = client.getCurrentNotificationEventId(); LOGGER.debug("ID of Last HMS notifications is: {}", eventId.getEventId()); - if (eventId.getEventId() < notificationId) { - LOGGER.error("Last notification of HMS is smaller than what sentry processed, Something is" + if (eventId != null && eventId.getEventId() < notificationId) { + LOGGER.debug("Last notification of HMS is smaller than what sentry processed, Something is" + "wrong. Sentry will request a full Snapshot"); - // TODO Path Mapping info should be cleared so that HMSFollower would request for full - // snapshot in the subsequent run. return Collections.emptyList(); } - if (eventId.getEventId() == notificationId) { + if (eventId != null && eventId.getEventId() == notificationId) { return Collections.emptyList(); } NotificationEventResponse response = client.getNextNotification(notificationId, Integer.MAX_VALUE, null); - if (response.isSetEvents()) { + if (response != null && response.isSetEvents()) { LOGGER.debug("Last Id processed:{}. Received collection of notifications, Size:{}", notificationId, response.getEvents().size()); return response.getEvents(); @@ -241,4 +241,22 @@ final class SentryHMSClient implements AutoCloseable { return Collections.emptyList(); } + + /** + * @return the latest notification Id logged by the HMS + * @throws Exception when an error occurs when talking to the HMS client + */ + public long getCurrentNotificationId() throws Exception { + if (client == null) { + LOGGER.error("Client is not connected to HMS"); + return SentryStore.EMPTY_NOTIFICATION_ID; + } + + CurrentNotificationEventId eventId = client.getCurrentNotificationEventId(); + if (eventId.isSetEventId()) { + return eventId.getEventId(); + } + + return SentryStore.EMPTY_NOTIFICATION_ID; + } } http://git-wip-us.apache.org/repos/asf/sentry/blob/ca315fe9/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java index d67c162..fdf52bf 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java @@ -20,10 +20,14 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import com.google.common.collect.Sets; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; @@ -36,6 +40,7 @@ import org.apache.hive.hcatalog.messaging.HCatEventMessage; import org.apache.hive.hcatalog.messaging.HCatEventMessage.EventType; import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageFactory; import org.apache.sentry.hdfs.Updateable; +import org.apache.sentry.provider.db.service.persistent.PathsImage; import org.apache.sentry.provider.db.service.persistent.SentryStore; import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable; import org.junit.BeforeClass; @@ -60,6 +65,147 @@ public class TestHMSFollower { configuration.set("sentry.hive.sync.create", "true"); } + @Test + public void testPersistAFullSnapshotWhenNoSnapshotAreProcessedYet() throws Exception { + /* + * TEST CASE + * + * Simulates (by using mocks) that Sentry has not processed any notifications, so this + * should trigger a new full HMS snapshot request with the eventId = 1 + */ + + final long SENTRY_PROCESSED_EVENT_ID = SentryStore.EMPTY_NOTIFICATION_ID; + final long HMS_PROCESSED_EVENT_ID = 1L; + + // Mock that returns a full snapshot + Map<String, Set<String>> snapshotObjects = new HashMap<>(); + snapshotObjects.put("db", Sets.newHashSet("/db")); + snapshotObjects.put("db.table", Sets.newHashSet("/db/table")); + PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1); + + SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class); + Mockito.when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot); + Mockito.when(sentryHmsClient.getCurrentNotificationId()).thenReturn(fullSnapshot.getId()); + + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, + hiveConnectionFactory, hiveInstance); + hmsFollower.setSentryHmsClient(sentryHmsClient); + + // 1st run should get a full snapshot because AuthzPathsMapping is empty + Mockito.when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); + Mockito.when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(true); + hmsFollower.run(); + Mockito.verify(sentryStore, times(1)).persistFullPathsImage(fullSnapshot.getPathImage()); + Mockito.verify(sentryStore, times(1)).persistLastProcessedNotificationID(fullSnapshot.getId()); + + Mockito.reset(sentryStore); + + // 2nd run should not get a snapshot because is already processed + Mockito.when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId()); + Mockito.when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); + hmsFollower.run(); + Mockito.verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap()); + Mockito.verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); + } + + @Test + public void testPersistAFullSnapshotWhenLastHmsNotificationIsLowerThanLastProcessed() + throws Exception { + /* + * TEST CASE + * + * Simulates (by using mocks) that Sentry already processed (and persisted) a notification + * with Id = 5, but the latest notification processed by the HMS is eventId = 1. So, an + * out-of-sync issue is happening on Sentry and HMS. This out-of-sync issue should trigger + * a new full HMS snapshot request with the same eventId = 1; + */ + + final long SENTRY_PROCESSED_EVENT_ID = 5L; + final long HMS_PROCESSED_EVENT_ID = 1L; + + // Mock that returns a full snapshot + Map<String, Set<String>> snapshotObjects = new HashMap<>(); + snapshotObjects.put("db", Sets.newHashSet("/db")); + snapshotObjects.put("db.table", Sets.newHashSet("/db/table")); + PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1); + + SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class); + Mockito.when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot); + Mockito.when(sentryHmsClient.getCurrentNotificationId()).thenReturn(fullSnapshot.getId()); + + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, + hiveConnectionFactory, hiveInstance); + hmsFollower.setSentryHmsClient(sentryHmsClient); + + // 1st run should get a full snapshot + Mockito.when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); + Mockito.when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); + hmsFollower.run(); + Mockito.verify(sentryStore, times(1)).persistFullPathsImage(Mockito.anyMap()); + Mockito.verify(sentryStore, times(1)).persistLastProcessedNotificationID(Mockito.anyLong()); + + Mockito.reset(sentryStore); + + // 2nd run should not get a snapshot because is already processed + Mockito.when(sentryStore.getLastProcessedNotificationID()).thenReturn(HMS_PROCESSED_EVENT_ID); + Mockito.when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); + hmsFollower.run(); + Mockito.verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap()); + Mockito.verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); + } + + @Test + public void testPersistAFullSnapshotWhenNextExpectedEventIsNotAvailable() throws Exception { + /* + * TEST CASE + * + * Simulates (by using mocks) that Sentry already processed (and persisted) a notification + * with Id = 1, and the latest notification processed by the HMS is eventId = 5. So, new + * notifications should be fetched. + * + * The number of expected notifications should be 4, but we simulate that we fetch only one + * notification with eventId = 5 causing an out-of-sync because the expected notificatoin + * should be 2. This out-of-sync should trigger a new full HMS snapshot request with the + * same eventId = 5. + */ + + final long SENTRY_PROCESSED_EVENT_ID = 1L; + final long HMS_PROCESSED_EVENT_ID = 5L; + + // Mock that returns a full snapshot + Map<String, Set<String>> snapshotObjects = new HashMap<>(); + snapshotObjects.put("db", Sets.newHashSet("/db")); + snapshotObjects.put("db.table", Sets.newHashSet("/db/table")); + PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1); + + SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class); + Mockito.when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot); + Mockito.when(sentryHmsClient.getCurrentNotificationId()).thenReturn(fullSnapshot.getId()); + Mockito.when(sentryHmsClient.getNotifications(SENTRY_PROCESSED_EVENT_ID)) + .thenReturn(Collections.singletonList( + new NotificationEvent(fullSnapshot.getId(), 0, "", ""))); + + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, + hiveConnectionFactory, hiveInstance); + hmsFollower.setSentryHmsClient(sentryHmsClient); + + // 1st run should get a full snapshot + Mockito.when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); + Mockito.when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); + hmsFollower.run(); + Mockito.verify(sentryStore, times(1)).persistFullPathsImage(Mockito.anyMap()); + Mockito.verify(sentryStore, times(1)).persistLastProcessedNotificationID(Mockito.anyLong()); + + Mockito.reset(sentryStore); + + // 2nd run should not get a snapshot because is already processed + Mockito.when(sentryStore.getLastProcessedNotificationID()).thenReturn(HMS_PROCESSED_EVENT_ID); + Mockito.when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); + hmsFollower.run(); + Mockito.verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap()); + Mockito.verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); + } + /** * Constructs create database event and makes sure that appropriate sentry store API's * are invoke when the event is processed by hms follower.
