Repository: sentry Updated Branches: refs/heads/master 24ea4a412 -> bfb07b4a5
SENTRY-1888: Sentry might not fetch all HMS duplicated events IDs when requested (Sergio Pena, reviewed by Alexander Kolbasov) Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/bfb07b4a Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/bfb07b4a Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/bfb07b4a Branch: refs/heads/master Commit: bfb07b4a54ac9a69b1fbea1a44751008e70bd9d5 Parents: 24ea4a4 Author: Sergio Pena <[email protected]> Authored: Fri Sep 1 09:54:34 2017 -0500 Committer: Sergio Pena <[email protected]> Committed: Fri Sep 1 09:54:34 2017 -0500 ---------------------------------------------------------------------- .../apache/sentry/hdfs/UniquePathsUpdate.java | 2 +- .../db/service/persistent/SentryStore.java | 23 +++ .../sentry/service/thrift/HMSFollower.java | 50 +++-- .../service/thrift/HiveNotificationFetcher.java | 198 +++++++++++++++++++ .../sentry/service/thrift/SentryHMSClient.java | 59 ------ .../sentry/service/thrift/TestHMSFollower.java | 58 ++++-- .../thrift/TestHiveNotificationFetcher.java | 163 +++++++++++++++ .../service/thrift/TestSentryHMSClient.java | 125 ------------ 8 files changed, 460 insertions(+), 218 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/bfb07b4a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UniquePathsUpdate.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UniquePathsUpdate.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UniquePathsUpdate.java index 7dae2f5..38b4e2a 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UniquePathsUpdate.java +++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UniquePathsUpdate.java @@ -47,7 +47,7 @@ public class UniquePathsUpdate extends PathsUpdate { return eventHash; } - private String sha1(NotificationEvent event) { + public static String sha1(NotificationEvent event) { StringBuilder sb = new StringBuilder(); sb.append(event.getEventId()); http://git-wip-us.apache.org/repos/asf/sentry/blob/bfb07b4a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java index 593b92f..04f6b43 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java @@ -4120,4 +4120,27 @@ public class SentryStore { tbs.add(transactionBlock); tm.executeTransactionBlocksWithRetry(tbs); } + + /** + * Checks if a notification was already processed by searching for the hash value + * on the MSentryPathChange table. + * + * @param hash A SHA-1 hex hash that represents a unique notification + * @return True if the notification was already processed; False otherwise + */ + public boolean isNotificationProcessed(final String hash) throws Exception { + return tm.executeTransactionWithRetry(new TransactionBlock<Boolean>() { + @Override + public Boolean execute(PersistenceManager pm) throws Exception { + pm.setDetachAllOnCommit(false); + Query query = pm.newQuery(MSentryPathChange.class); + query.setFilter("this.notificationHash == hash"); + query.setUnique(true); + query.declareParameters("java.lang.String hash"); + MSentryPathChange changes = (MSentryPathChange) query.execute(hash); + + return changes != null; + } + }); + } } http://git-wip-us.apache.org/repos/asf/sentry/blob/bfb07b4a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java index 35da6fc..d4feb38 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java @@ -48,6 +48,7 @@ public class HMSFollower implements Runnable, AutoCloseable { private final Configuration authzConf; private final SentryStore sentryStore; private final NotificationProcessor notificationProcessor; + private final HiveNotificationFetcher notificationFetcher; private final boolean hdfsSyncEnabled; private final LeaderStatusMonitor leaderMonitor; @@ -60,7 +61,7 @@ public class HMSFollower implements Runnable, AutoCloseable { * @param leaderMonitor singleton instance of LeaderStatusMonitor */ HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor, - HiveSimpleConnectionFactory hiveConnectionFactory) { + HiveConnectionFactory hiveConnectionFactory) { this(conf, store, leaderMonitor, hiveConnectionFactory, null); } @@ -74,7 +75,7 @@ public class HMSFollower implements Runnable, AutoCloseable { */ @VisibleForTesting public HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor, - HiveSimpleConnectionFactory hiveConnectionFactory, String authServerName) { + HiveConnectionFactory hiveConnectionFactory, String authServerName) { LOGGER.info("HMSFollower is being initialized"); authzConf = conf; this.leaderMonitor = leaderMonitor; @@ -88,6 +89,7 @@ public class HMSFollower implements Runnable, AutoCloseable { notificationProcessor = new NotificationProcessor(sentryStore, authServerName, authzConf); client = new SentryHMSClient(authzConf, hiveConnectionFactory); hdfsSyncEnabled = SentryServiceUtil.isHDFSSyncEnabledNoCache(authzConf); // no cache to test different settings for hdfs sync + notificationFetcher = new HiveNotificationFetcher(sentryStore, hiveConnectionFactory); } @VisibleForTesting @@ -110,6 +112,8 @@ public class HMSFollower implements Runnable, AutoCloseable { LOGGER.error("Failed to close the Sentry Hms Client", failure); } } + + notificationFetcher.close(); } @Override @@ -169,7 +173,8 @@ public class HMSFollower implements Runnable, AutoCloseable { return; } - Collection<NotificationEvent> notifications = client.getNotifications(notificationId); + Collection<NotificationEvent> notifications = + notificationFetcher.fetchNotifications(notificationId); // After getting notifications, it checks if the HMS did some clean-up and notifications // are out-of-sync with Sentry. @@ -214,7 +219,7 @@ public class HMSFollower implements Runnable, AutoCloseable { return true; } - long currentHmsNotificationId = client.getCurrentNotificationId(); + long currentHmsNotificationId = notificationFetcher.getCurrentNotificationId(); if (currentHmsNotificationId < latestSentryNotificationId) { LOGGER.info("The latest notification ID on HMS is less than the latest notification ID " + "processed by Sentry. Need to request a full HMS snapshot."); @@ -240,30 +245,33 @@ public class HMSFollower implements Runnable, AutoCloseable { return false; } + /* + * If the sequence of notifications has a gap, then an out-of-sync might + * have happened due to the following issue: + * + * - HDFS sync was disabled or Sentry was shutdown for a time period longer than + * the HMS notification clean-up thread causing old notifications to be deleted. + * + * HMS notifications may contain both gaps in the sequence and duplicates + * (the same ID repeated more then once for different events). + * + * To accept duplicates (see NotificationFetcher for more info), then a gap is found + * if the 1st notification received is higher than the current ID processed + 1. + * i.e. + * 1st ID = 3, latest ID = 3 (duplicate found but no gap detected) + * 1st ID = 4, latest ID = 3 (consecutive ID found but no gap detected) + * 1st ID = 5, latest ID = 3 (a gap is detected) + */ + List<NotificationEvent> eventList = (List<NotificationEvent>) events; long firstNotificationId = eventList.get(0).getEventId(); - long lastNotificationId = eventList.get(eventList.size() - 1).getEventId(); - - // - // If the next expected notification is not available, then an out-of-sync might - // have happened due to the following issue: - // - // - HDFS sync was disabled or Sentry was shutdown for a time period longer than - // the HMS notification clean-up thread causing old notifications to be deleted. - // - if ((latestProcessedId + 1) != firstNotificationId) { + + if (firstNotificationId > (latestProcessedId + 1)) { LOGGER.info("Current HMS notifications are out-of-sync with latest Sentry processed" + "notifications. Need to request a full HMS snapshot."); return true; } - long expectedSize = lastNotificationId - latestProcessedId; - if (expectedSize < eventList.size()) { - LOGGER.info("The HMS notifications fetched has some gaps in the # of events received. These" - + "should not cause an out-of-sync issue. (expected = {}, fetched = {})", - expectedSize, eventList.size()); - } - return false; } http://git-wip-us.apache.org/repos/asf/sentry/blob/bfb07b4a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java new file mode 100644 index 0000000..4d32992 --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java @@ -0,0 +1,198 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + <p> + http://www.apache.org/licenses/LICENSE-2.0 + <p> + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package org.apache.sentry.service.thrift; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter; +import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.sentry.hdfs.UniquePathsUpdate; +import org.apache.sentry.provider.db.service.persistent.SentryStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Helper class used to fetch Hive MetaStore notifications. + */ +public final class HiveNotificationFetcher implements AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(HiveNotificationFetcher.class); + + private final SentryStore sentryStore; + private final HiveConnectionFactory hmsConnectionFactory; + private HiveMetaStoreClient hmsClient; + + /* The following cache and last filtered ID help us to avoid making less calls to the DB */ + private long lastIdFiltered = 0; + private Set<String> cache = new HashSet<>(); + + HiveNotificationFetcher(SentryStore sentryStore, HiveConnectionFactory hmsConnectionFactory) { + this.sentryStore = sentryStore; + this.hmsConnectionFactory = hmsConnectionFactory; + } + + /** + * Fetch new HMS notifications appeared since a specified event ID. The returned list may + * include notifications with the same specified ID if they were not seen by Sentry. + * + * @param lastEventId The event ID to use to request notifications. + * @return A list of newer notifications unseen by Sentry. + * @throws Exception If an error occurs on the HMS communication. + */ + List<NotificationEvent> fetchNotifications(long lastEventId) throws Exception { + return fetchNotifications(lastEventId, Integer.MAX_VALUE); + } + + /** + * Fetch new HMS notifications appeared since a specified event ID. The returned list may + * include notifications with the same specified ID if they were not seen by Sentry. + * + * @param lastEventId The event ID to use to request notifications. + * @param maxEvents The maximum number of events to fetch. + * @return A list of newer notifications unseen by Sentry. + * @throws Exception If an error occurs on the HMS communication. + */ + List<NotificationEvent> fetchNotifications(long lastEventId, int maxEvents) throws Exception { + NotificationFilter filter = null; + + /* + * HMS may bring duplicated events that were committed later than the previous request. To bring + * those newer duplicated events, we request new notifications from the last seen ID - 1. + * + * A current problem is that we could miss duplicates committed much more later, but because + * HMS does not guarantee the order of those, then it is safer to avoid processing them. + * + * TODO: We can avoid doing this once HIVE-16886 is fixed. + */ + if (lastEventId > 0) { + filter = createNotificationFilterFor(lastEventId); + lastEventId--; + } + + LOGGER.debug("Requesting HMS notifications since ID = {}", lastEventId); + NotificationEventResponse response = + getHmsClient().getNextNotification(lastEventId, maxEvents, filter); + + if (response != null && response.isSetEvents()) { + LOGGER.debug("Fetched {} new HMS notification(s)", response.getEventsSize()); + return response.getEvents(); + } + + return Collections.emptyList(); + } + + /** + * Returns a HMS notification filter for a specific notification ID. HMS notifications may + * have duplicated IDs, so the filter uses a SHA-1 hash to check for a unique notification. + * + * @param id the notification ID to filter + * @return the HMS notification filter + */ + private NotificationFilter createNotificationFilterFor(final long id) { + /* + * A SHA-1 hex value that keeps unique notifications processed is persisted on the Sentry DB. + * To keep unnecessary calls to the DB, we use a cache that keeps seen hashes of the + * specified ID. If a new filter ID is used, then we clean up the cache. + */ + + if (lastIdFiltered != id) { + lastIdFiltered = id; + cache.clear(); + } + + return new NotificationFilter() { + @Override + public boolean accept(NotificationEvent notificationEvent) { + if (notificationEvent.getEventId() == id) { + String hash = UniquePathsUpdate.sha1(notificationEvent); + + try { + if (cache.contains(hash) || sentryStore.isNotificationProcessed(hash)) { + cache.add(hash); + + LOGGER.debug("Ignoring HMS notification already processed: ID = {}", id); + return false; + } + } catch (Exception e) { + LOGGER.error("An error occurred while checking if notification {} is already " + + "processed: {}", id, e.getMessage()); + + // We cannot throw an exception on this filter, so we return false assuming this + // notification is already processed + return false; + } + } + + return true; + } + }; + } + + /** + * Gets the HMS client connection object. + * If will create a new connection if no connection object exists. + * + * @return The HMS client used to communication with the Hive MetaStore. + * @throws Exception If it cannot connect to the HMS service. + */ + private HiveMetaStoreClient getHmsClient() throws Exception { + if (hmsClient == null) { + try { + hmsClient = hmsConnectionFactory.connect().getClient(); + } catch (Exception e) { + LOGGER.error("Fail to connect to the HMS service: {}", e.getMessage()); + throw e; + } + } + + return hmsClient; + } + + /** + * @return the latest notification Id logged by the HMS + * @throws Exception when an error occurs when talking to the HMS client + */ + long getCurrentNotificationId() throws Exception { + CurrentNotificationEventId eventId = getHmsClient().getCurrentNotificationEventId(); + if (eventId != null && eventId.isSetEventId()) { + return eventId.getEventId(); + } + + return SentryStore.EMPTY_NOTIFICATION_ID; + } + + /* AutoCloseable implementations */ + + @Override + public void close() { + try { + if (hmsClient != null) { + hmsClient.close(); + } + + cache.clear(); + } finally { + hmsClient = null; + } + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/bfb07b4a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java index 4a8fb95..12bf4a1 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java @@ -29,7 +29,6 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; -import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.Set; @@ -242,62 +241,4 @@ class SentryHMSClient implements AutoCloseable { return emptyMap(); } } - - /** - * Returns all HMS notifications with ID greater than the specified one - * - * @param notificationId ID of the last notification that was processed. - * @return Collection of new events to be synced - */ - Collection<NotificationEvent> getNotifications(long notificationId) throws Exception { - if (client == null) { - LOGGER.error(NOT_CONNECTED_MSG); - return Collections.emptyList(); - } - - LOGGER.debug("Checking for notifications beyond {}", notificationId); - - // A bug HIVE-15761 (fixed on Hive 2.4.0) should allow requesting notifications using - // an unprocessed notification ID without causing an exception. For now, we just - // leave this workaround and log debug messages. - CurrentNotificationEventId eventId = client.getCurrentNotificationEventId(); - LOGGER.debug("ID of Last HMS notifications is: {}", eventId.getEventId()); - if (eventId != null && eventId.getEventId() < notificationId) { - LOGGER.debug("Last notification of HMS is smaller than what sentry processed, Something is" - + "wrong. Sentry will request a full Snapshot"); - return Collections.emptyList(); - } - - if (eventId != null && eventId.getEventId() == notificationId) { - return Collections.emptyList(); - } - - NotificationEventResponse response = - client.getNextNotification(notificationId, Integer.MAX_VALUE, null); - if (response != null && response.isSetEvents()) { - LOGGER.debug("Last Id processed:{}. Received collection of notifications, Size:{}", - notificationId, response.getEvents().size()); - return response.getEvents(); - } - - return Collections.emptyList(); - } - - /** - * @return the latest notification Id logged by the HMS - * @throws Exception when an error occurs when talking to the HMS client - */ - long getCurrentNotificationId() throws Exception { - if (client == null) { - LOGGER.error(NOT_CONNECTED_MSG); - return SentryStore.EMPTY_NOTIFICATION_ID; - } - - CurrentNotificationEventId eventId = client.getCurrentNotificationEventId(); - if (eventId.isSetEventId()) { - return eventId.getEventId(); - } - - return SentryStore.EMPTY_NOTIFICATION_ID; - } } http://git-wip-us.apache.org/repos/asf/sentry/blob/bfb07b4a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java index f56384a..35f8316 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.when; import com.google.common.collect.Sets; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -32,9 +33,13 @@ import java.util.Set; import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter; +import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; @@ -47,6 +52,7 @@ import org.apache.sentry.hdfs.UniquePathsUpdate; import org.apache.sentry.provider.db.service.persistent.PathsImage; import org.apache.sentry.provider.db.service.persistent.SentryStore; import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; @@ -62,6 +68,11 @@ public class TestHMSFollower { private final SentryStore sentryStore = Mockito.mock(SentryStore.class); private static HiveSimpleConnectionFactory hiveConnectionFactory; + private final static HiveConnectionFactory hmsConnectionMock + = Mockito.mock(HiveConnectionFactory.class); + private final static HiveMetaStoreClient hmsClientMock + = Mockito.mock(HiveMetaStoreClient.class); + @BeforeClass public static void setup() throws IOException, LoginException { hiveConnectionFactory = new HiveSimpleConnectionFactory(configuration, new HiveConf()); @@ -73,6 +84,12 @@ public class TestHMSFollower { configuration.set(ServiceConstants.ServerConfig.SENTRY_POLICY_STORE_PLUGINS, "org.apache.sentry.hdfs.SentryPlugin"); } + @Before + public void setupMocks() throws Exception { + reset(hmsConnectionMock, hmsClientMock); + when(hmsConnectionMock.connect()).thenReturn(new HMSClient(hmsClientMock)); + } + @Test public void testPersistAFullSnapshotWhenNoSnapshotAreProcessedYet() throws Exception { /* @@ -91,12 +108,15 @@ public class TestHMSFollower { snapshotObjects.put("db.table", Sets.newHashSet("/db/table")); PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1); + // Mock that returns the current HMS notification ID + when(hmsClientMock.getCurrentNotificationEventId()) + .thenReturn(new CurrentNotificationEventId(fullSnapshot.getId())); + SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class); when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot); - when(sentryHmsClient.getCurrentNotificationId()).thenReturn(fullSnapshot.getId()); HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, - hiveConnectionFactory, hiveInstance); + hmsConnectionMock, hiveInstance); hmsFollower.setSentryHmsClient(sentryHmsClient); // 1st run should get a full snapshot because AuthzPathsMapping is empty @@ -140,12 +160,15 @@ public class TestHMSFollower { snapshotObjects.put("db.table", Sets.newHashSet("/db/table")); PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1); + // Mock that returns the current HMS notification ID + when(hmsClientMock.getCurrentNotificationEventId()) + .thenReturn(new CurrentNotificationEventId(fullSnapshot.getId())); + SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class); when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot); - when(sentryHmsClient.getCurrentNotificationId()).thenReturn(fullSnapshot.getId()); HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, - hiveConnectionFactory, hiveInstance); + hmsConnectionMock, hiveInstance); hmsFollower.setSentryHmsClient(sentryHmsClient); // 1st run should get a full snapshot @@ -189,21 +212,29 @@ public class TestHMSFollower { snapshotObjects.put("db.table", Sets.newHashSet("/db/table")); PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1); + // Mock that returns the current HMS notification ID + when(hmsClientMock.getCurrentNotificationEventId()) + .thenReturn(new CurrentNotificationEventId(fullSnapshot.getId())); + SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class); when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot); - when(sentryHmsClient.getCurrentNotificationId()).thenReturn(fullSnapshot.getId()); - when(sentryHmsClient.getNotifications(SENTRY_PROCESSED_EVENT_ID)) - .thenReturn(Collections.singletonList( - new NotificationEvent(fullSnapshot.getId(), 0, "", ""))); + + when(hmsClientMock.getNextNotification(Mockito.eq(SENTRY_PROCESSED_EVENT_ID - 1), Mockito.eq(Integer.MAX_VALUE), + (NotificationFilter) Mockito.notNull())) + .thenReturn(new NotificationEventResponse( + Arrays.<NotificationEvent>asList( + new NotificationEvent(fullSnapshot.getId(), 0, "", "") + ) + )); HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, - hiveConnectionFactory, hiveInstance); + hmsConnectionMock, hiveInstance); hmsFollower.setSentryHmsClient(sentryHmsClient); // 1st run should get a full snapshot when(sentryStore.getLastProcessedNotificationID()) .thenReturn(SENTRY_PROCESSED_EVENT_ID); - when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); + when(sentryStore.isHmsNotificationEmpty()).thenReturn(false); hmsFollower.run(); verify(sentryStore, times(1)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong()); verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); @@ -771,13 +802,16 @@ public class TestHMSFollower { snapshotObjects.put("db.table", Sets.newHashSet("/db/table")); PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1); + // Mock that returns the current HMS notification ID + when(hmsClientMock.getCurrentNotificationEventId()) + .thenReturn(new CurrentNotificationEventId(fullSnapshot.getId())); + SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class); when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot); - when(sentryHmsClient.getCurrentNotificationId()).thenReturn(fullSnapshot.getId()); Configuration configuration = new Configuration(); HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, - hiveConnectionFactory, hiveInstance); + hmsConnectionMock, hiveInstance); hmsFollower.setSentryHmsClient(sentryHmsClient); // 1st run should get a full snapshot because AuthzPathsMapping is empty http://git-wip-us.apache.org/repos/asf/sentry/blob/bfb07b4a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java new file mode 100644 index 0000000..83a1bec --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java @@ -0,0 +1,163 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + <p> + http://www.apache.org/licenses/LICENSE-2.0 + <p> + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package org.apache.sentry.service.thrift; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.sentry.hdfs.UniquePathsUpdate; +import org.apache.sentry.provider.db.service.persistent.SentryStore; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class TestHiveNotificationFetcher { + @Test + public void testGetEmptyNotificationsWhenHmsReturnsANullResponse() throws Exception { + SentryStore store = Mockito.mock(SentryStore.class); + HiveConnectionFactory hmsConnection = Mockito.mock(HiveConnectionFactory.class); + HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class); + + Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient)); + + try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection)) { + List<NotificationEvent> events; + + Mockito.when(hmsClient.getNextNotification(0, Integer.MAX_VALUE, null)) + .thenReturn(null); + + events = fetcher.fetchNotifications(0); + assertTrue(events.isEmpty()); + } + } + + @Test + public void testGetEmptyNotificationsWhenHmsReturnsEmptyEvents() throws Exception { + SentryStore store = Mockito.mock(SentryStore.class); + HiveConnectionFactory hmsConnection = Mockito.mock(HiveConnectionFactory.class); + HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class); + + Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient)); + + try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection)) { + List<NotificationEvent> events; + + Mockito.when(hmsClient.getNextNotification(0, Integer.MAX_VALUE, null)) + .thenReturn(new NotificationEventResponse(Collections.<NotificationEvent>emptyList())); + + events = fetcher.fetchNotifications(0); + assertTrue(events.isEmpty()); + } + } + + @Test + public void testGetAllNotificationsReturnedByHms() throws Exception { + SentryStore store = Mockito.mock(SentryStore.class); + HiveConnectionFactory hmsConnection = Mockito.mock(HiveConnectionFactory.class); + HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class); + + Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient)); + + try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection)) { + List<NotificationEvent> events; + + Mockito.when(hmsClient.getNextNotification(0, Integer.MAX_VALUE, null)) + .thenReturn(new NotificationEventResponse( + Arrays.<NotificationEvent>asList( + new NotificationEvent(1L, 0, "CREATE_DATABASE", ""), + new NotificationEvent(2L, 0, "CREATE_TABLE", "") + ) + )); + + events = fetcher.fetchNotifications(0); + assertEquals(2, events.size()); + assertEquals(1, events.get(0).getEventId()); + assertEquals("CREATE_DATABASE", events.get(0).getEventType()); + assertEquals(2, events.get(1).getEventId()); + assertEquals("CREATE_TABLE", events.get(1).getEventType()); + } + } + + @Test + public void testGetDuplicatedEventsAndFilterEventsAlreadySeen() throws Exception { + final SentryStore store = Mockito.mock(SentryStore.class); + HiveConnectionFactory hmsConnection = Mockito.mock(HiveConnectionFactory.class); + HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class); + + Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient)); + + try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection)) { + List<NotificationEvent> events; + + /* + * Requesting an ID > 0 will request all notifications from 0 again but filter those + * already seen notifications with ID = 1 + */ + + // This mock will also test that the NotificationFilter works as expected + Mockito.when(hmsClient.getNextNotification(Mockito.eq(0L), Mockito.eq(Integer.MAX_VALUE), + (NotificationFilter) Mockito.notNull())).thenAnswer(new Answer<NotificationEventResponse>() { + @Override + public NotificationEventResponse answer(InvocationOnMock invocation) + throws Throwable { + NotificationFilter filter = (NotificationFilter) invocation.getArguments()[2]; + NotificationEventResponse response = new NotificationEventResponse(); + + List<NotificationEvent> events = Arrays.<NotificationEvent>asList( + new NotificationEvent(1L, 0, "CREATE_DATABASE", ""), + new NotificationEvent(1L, 0, "CREATE_TABLE", ""), + new NotificationEvent(2L, 0, "ALTER_TABLE", "") + ); + + for (NotificationEvent event : events) { + String hash = UniquePathsUpdate.sha1(event); + + // We simulate that CREATE_DATABASE is already processed + if (event.getEventType().equals("CREATE_DATABASE")) { + Mockito.when(store.isNotificationProcessed(Mockito.eq(hash))).thenReturn(true); + } else { + Mockito.when(store.isNotificationProcessed(Mockito.eq(hash))).thenReturn(false); + } + + if (filter.accept(event)) { + response.addToEvents(event); + } + } + + return response; + } + }); + + events = fetcher.fetchNotifications(1); + assertEquals(2, events.size()); + assertEquals(1, events.get(0).getEventId()); + assertEquals("CREATE_TABLE", events.get(0).getEventType()); + assertEquals(2, events.get(1).getEventId()); + assertEquals("ALTER_TABLE", events.get(1).getEventType()); + } + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/bfb07b4a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestSentryHMSClient.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestSentryHMSClient.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestSentryHMSClient.java index 77a2bbb..38668ca 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestSentryHMSClient.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestSentryHMSClient.java @@ -25,20 +25,14 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hive.hcatalog.messaging.HCatEventMessage; -import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageFactory; import org.apache.sentry.provider.db.service.persistent.PathsImage; import org.apache.thrift.TException; import org.junit.Assert; @@ -56,7 +50,6 @@ import javax.security.auth.login.LoginException; public class TestSentryHMSClient { private static final Configuration conf = new Configuration(); - private static final SentryJSONMessageFactory messageFactory = new SentryJSONMessageFactory(); private static SentryHMSClient client; private static MockHMSClientFactory hiveConnectionFactory; @@ -108,36 +101,6 @@ public class TestSentryHMSClient { return partition; } - /** - * Creates create database notification - * - * @return NotificationEvent - */ - private static NotificationEvent getCreateDatabaseNotification(long id) { - Random rand = new Random(); - int n = rand.nextInt(100) + 1; - String dbName = "db" + n; - return new NotificationEvent(id, 0, HCatEventMessage.EventType.CREATE_DATABASE.toString(), - messageFactory - .buildCreateDatabaseMessage(new Database(dbName, null, "hdfs:///" + dbName, null)) - .toString()); - } - - /** - * Creates drop database notification - * - * @return NotificationEvent - */ - private static NotificationEvent getDropDatabaseNotification(long id) { - Random rand = new Random(); - int n = rand.nextInt(100) + 1; - String dbName = "db" + n; - return new NotificationEvent(id, 0, HCatEventMessage.EventType.DROP_DATABASE.toString(), - messageFactory - .buildDropDatabaseMessage(new Database(dbName, null, "hdfs:///" + dbName, null)) - .toString()); - } - @BeforeClass static public void initialize() throws IOException, LoginException { hiveConnectionFactory = new MockHMSClientFactory(); @@ -235,94 +198,6 @@ public class TestSentryHMSClient { } /** - * Test scenario when there is no HMS connection - * Getting new notifications - */ - @Test - public void testGetNewNotificationsWithOutClientConnected() throws Exception { - HiveTable tab21 = new HiveTable("tab21"); - HiveTable tab31 = new HiveTable("tab31"); - HiveDb db3 = new HiveDb("db3", Lists.newArrayList(tab31)); - HiveDb db2 = new HiveDb("db2", Lists.newArrayList(tab21)); - HiveDb db1 = new HiveDb("db1"); - client.setClient(null); - HiveSnapshot snap = new HiveSnapshot().add(db1).add(db2).add(db3); - MockClient mockClient = new MockClient(snap, 100); - Mockito.when(mockClient.client.getCurrentNotificationEventId()). - thenReturn(new CurrentNotificationEventId(mockClient.eventId)); - // Make sure that client is not connected - Assert.assertTrue(!client.isConnected()); - Collection<NotificationEvent> events = client.getNotifications(100); - Assert.assertTrue(events.isEmpty()); - - } - - /** - * Test scenario where there are no notifications - * Getting new notifications - */ - @Test - public void testGetNewNotificationsWithNoHmsUpdates() throws Exception { - HiveTable tab21 = new HiveTable("tab21"); - HiveTable tab31 = new HiveTable("tab31"); - HiveDb db3 = new HiveDb("db3", Lists.newArrayList(tab31)); - HiveDb db2 = new HiveDb("db2", Lists.newArrayList(tab21)); - HiveDb db1 = new HiveDb("db1"); - HiveSnapshot snap = new HiveSnapshot().add(db1).add(db2).add(db3); - MockClient mockClient = new MockClient(snap, 100); - Mockito.when(mockClient.client.getCurrentNotificationEventId()). - thenReturn(new CurrentNotificationEventId(mockClient.eventId)); - client.setClient(mockClient.client); - hiveConnectionFactory.setClient(mockClient); - // Make sure that client is connected - Assert.assertTrue(client.isConnected()); - Collection<NotificationEvent> events = client.getNotifications(100); - Assert.assertTrue(events.isEmpty()); - } - - /** - * Test scenario where there are notifications - * Getting new notifications - */ - @Test - public void testGetNewNotificationsSuccess() throws Exception { - final MockClient mockClient = new MockClient(new HiveSnapshot(), 100); - client.setClient(mockClient.client); - hiveConnectionFactory.setClient(mockClient); - // Make sure that client is connected - Assert.assertTrue(client.isConnected()); - - Mockito.when(mockClient.client.getCurrentNotificationEventId()). - thenAnswer(new Answer<CurrentNotificationEventId>() { - @Override - public CurrentNotificationEventId answer(InvocationOnMock invocation) - throws Throwable { - return new CurrentNotificationEventId(mockClient.incrementNotificationEventId()); - } - }); - Mockito.when(mockClient.client.getNextNotification(Mockito.anyLong(), Mockito.anyInt(), - Mockito.any(NotificationFilter.class))). - thenAnswer(new Answer<NotificationEventResponse>() { - @Override - public NotificationEventResponse answer(InvocationOnMock invocation) - throws Throwable { - long id = 1; - List<NotificationEvent> events = new ArrayList<>(); - events.add(getCreateDatabaseNotification(id++)); - events.add(getDropDatabaseNotification(id++)); - return new NotificationEventResponse(events); - } - }); - - Collection<NotificationEvent> events = client.getNotifications(100); - long id = 1; - for (NotificationEvent event : events) { - Assert.assertEquals(event.getEventId(), id++); - } - Assert.assertTrue(events.size() == 2); - } - - /** * Representation of a Hive table. A table has a name and a list of partitions. */ private static class HiveTable {
