http://git-wip-us.apache.org/repos/asf/sentry/blob/c4226f64/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestCounterWait.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestCounterWait.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestCounterWait.java new file mode 100644 index 0000000..8940154 --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestCounterWait.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.provider.db.service.persistent; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.junit.Test; + +/** + * Test for CounterWait class + */ +public class TestCounterWait { + // Used to verify that wakeups happen in the right order + private final BlockingDeque<Long> outSyncQueue = new LinkedBlockingDeque<>(); + + @Test + public void testWaitFor() throws Exception { + // Create a thread for each waiter + int nthreads = 20; + ExecutorService executor = Executors.newFixedThreadPool(nthreads); + + final CounterWait waiter = new CounterWait(); + + // Initial value is zero, so this shouldn't block + assertEquals(0, waiter.waitFor(0)); + + // Create a pair of threads waiting for each value in [1, nthreads / 2] + // We use pair of threads per value to verify that both are waken up + for (int i = 0; i < nthreads; i++) { + int finalI = i + 2; + final int val = finalI / 2; + executor.execute(new Runnable() { + public void run() { + long r = 0; + try { + r = waiter.waitFor(val); // blocks + } catch (InterruptedException | TimeoutException e) { + e.printStackTrace(); + } + outSyncQueue.add(r); // Once we wake up, post result + } + } + ); + } + + // Wait until all threads are asleep. + while(waiter.waitersCount() < nthreads) { + sleep(20); + } + + // All threads should be blocked, so outSyncQueue should be empty + assertTrue(outSyncQueue.isEmpty()); + + // Post a counter update for each value in [ 1, nthreads / 2 ] + // After eac update two threads should be waken up and the corresponding pair of + // values should appear in the outSyncQueue. + for (int i = 0; i < (nthreads / 2); i++) { + waiter.update(i + 1); + long r = outSyncQueue.takeFirst(); + assertEquals(r, i + 1); + r = outSyncQueue.takeFirst(); + assertEquals(r, i + 1); + assertTrue(outSyncQueue.isEmpty()); + } + + // We are done + executor.shutdown(); + } + + // Test for waitFor() timeout throwing TimeoutException + @Test(expected = TimeoutException.class) + public void testWaitForWithTimeout() throws Exception { + CounterWait waiter = new CounterWait(1, TimeUnit.MILLISECONDS); + waiter.waitFor(1); // Should throw exception + } + + private void sleep(long ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + } + } +}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c4226f64/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java new file mode 100644 index 0000000..7903078 --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java @@ -0,0 +1,1064 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.provider.db.service.persistent; + +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.Sets; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import junit.framework.Assert; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter; +import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.EventMessage; +import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType; +import org.apache.sentry.binding.hive.conf.HiveAuthzConf; +import org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageFactory; +import org.apache.sentry.core.common.utils.PubSub; +import org.apache.sentry.hdfs.UniquePathsUpdate; +import org.apache.sentry.service.thrift.SentryHMSClient; +import org.apache.sentry.service.thrift.HiveConnectionFactory; +import org.apache.sentry.service.thrift.HiveSimpleConnectionFactory; +import org.apache.sentry.service.thrift.ServiceConstants; +import org.apache.sentry.service.thrift.HMSClient; +import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable; +import static org.apache.sentry.hdfs.ServiceConstants.ServerConfig.SENTRY_SERVICE_FULL_UPDATE_PUBSUB; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.Mockito; + +import javax.security.auth.login.LoginException; + +public class TestHMSFollower { + + private final static String hiveInstance = "server2"; + private final static Configuration configuration = new Configuration(); + private final SentryJSONMessageFactory messageFactory = new SentryJSONMessageFactory(); + private final SentryStore sentryStore = Mockito.mock(SentryStore.class); + private static HiveSimpleConnectionFactory hiveConnectionFactory; + + private final static HiveConnectionFactory hmsConnectionMock + = Mockito.mock(HiveConnectionFactory.class); + private final static HiveMetaStoreClient hmsClientMock + = Mockito.mock(HiveMetaStoreClient.class); + + @BeforeClass + public static void setup() throws IOException, LoginException { + hiveConnectionFactory = new HiveSimpleConnectionFactory(configuration, new HiveConf()); + hiveConnectionFactory.init(); + configuration.set("sentry.hive.sync.create", "true"); + configuration.set(SENTRY_SERVICE_FULL_UPDATE_PUBSUB, "true"); + + // enable HDFS sync, so perm and path changes will be saved into DB + configuration.set(ServiceConstants.ServerConfig.PROCESSOR_FACTORIES, "org.apache.sentry.hdfs.SentryHDFSServiceProcessorFactory"); + configuration.set(ServiceConstants.ServerConfig.SENTRY_POLICY_STORE_PLUGINS, "org.apache.sentry.hdfs.SentryPlugin"); + } + + @Before + public void setupMocks() throws Exception { + reset(hmsConnectionMock, hmsClientMock); + when(hmsConnectionMock.connect()).thenReturn(new HMSClient(hmsClientMock)); + } + + @Test + public void testPersistAFullSnapshotWhenNoSnapshotAreProcessedYet() throws Exception { + /* + * TEST CASE + * + * Simulates (by using mocks) that Sentry has not processed any notifications, so this + * should trigger a new full HMS snapshot request with the eventId = 1 + */ + + final long SENTRY_PROCESSED_EVENT_ID = SentryStore.EMPTY_NOTIFICATION_ID; + final long HMS_PROCESSED_EVENT_ID = 1L; + + // Mock that returns a full snapshot + Map<String, Collection<String>> snapshotObjects = new HashMap<>(); + snapshotObjects.put("db", Sets.newHashSet("/db")); + snapshotObjects.put("db.table", Sets.newHashSet("/db/table")); + PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1); + + // Mock that returns the current HMS notification ID + when(hmsClientMock.getCurrentNotificationEventId()) + .thenReturn(new CurrentNotificationEventId(fullSnapshot.getId())); + + SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class); + when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot); + + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, + hmsConnectionMock, hiveInstance); + hmsFollower.setSentryHmsClient(sentryHmsClient); + + // 1st run should get a full snapshot because AuthzPathsMapping is empty + when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); + when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(true); + when(sentryStore.isHmsNotificationEmpty()).thenReturn(true); + hmsFollower.run(); + verify(sentryStore, times(1)).persistFullPathsImage( + fullSnapshot.getPathImage(), fullSnapshot.getId()); + // Saving notificationID is in the same transaction of saving full snapshot + verify(sentryStore, times(0)).persistLastProcessedNotificationID(fullSnapshot.getId()); + + reset(sentryStore); + + // 2nd run should not get a snapshot because is already processed + when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId()); + when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); + hmsFollower.run(); + verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong()); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); + } + + @Test + public void testPersistAFullSnapshotWhenFullSnapshotTrigger() throws Exception { + /* + * TEST CASE + * + * Simulates (by using mocks) the following: + * + * HMS client always returns the paths image with the eventId == 1. + * + * On the 1st run: Sentry has not processed any notifications, so this + * should trigger a new full HMS snapshot request with the eventId = 1 + * + * On the 2nd run: Sentry store returns the latest eventId == 1, + * which matches the eventId returned by HMS client. Because of the match, + * no full update is triggered. + * + * On the 3d run: before the run, full update flag in HMSFollower is set via + * publish-subscribe mechanism. + * Sentry store still returns the latest eventId == 1, + * which matches the eventId returned by HMS client. Because of the match, + * no full update should be triggered. However, because of the trigger set, + * a new full HMS snapshot will be triggered. + * + * On the 4th run: Sentry store returns the latest eventId == 1, + * which matches the eventId returned by HMS client. Because of the match, + * no full update is triggered. This is to check that forced trigger set + * for run 3 only works once. + * + */ + + final long SENTRY_PROCESSED_EVENT_ID = SentryStore.EMPTY_NOTIFICATION_ID; + final long HMS_PROCESSED_EVENT_ID = 1L; + + // Mock that returns a full snapshot + Map<String, Collection<String>> snapshotObjects = new HashMap<>(); + snapshotObjects.put("db", Sets.newHashSet("/db")); + snapshotObjects.put("db.table", Sets.newHashSet("/db/table")); + PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1); + + // Mock that returns the current HMS notification ID + when(hmsClientMock.getCurrentNotificationEventId()) + .thenReturn(new CurrentNotificationEventId(fullSnapshot.getId())); + + SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class); + when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot); + + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, + hmsConnectionMock, hiveInstance); + hmsFollower.setSentryHmsClient(sentryHmsClient); + + // 1st run should get a full snapshot because AuthzPathsMapping is empty + when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); + when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(true); + when(sentryStore.isHmsNotificationEmpty()).thenReturn(true); + hmsFollower.run(); + verify(sentryStore, times(1)).persistFullPathsImage( + fullSnapshot.getPathImage(), fullSnapshot.getId()); + // Saving notificationID is in the same transaction of saving full snapshot + verify(sentryStore, times(0)).persistLastProcessedNotificationID(fullSnapshot.getId()); + + reset(sentryStore); + + // 2nd run should not get a snapshot because is already processed + when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId()); + when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); + hmsFollower.run(); + verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong()); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); + + reset(sentryStore); + + // 3d run should not get a snapshot because is already processed, + // but because of full update trigger it will, as in the first run + PubSub.getInstance().publish(PubSub.Topic.HDFS_SYNC_HMS, "message"); + + when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId()); + when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); + hmsFollower.run(); + verify(sentryStore, times(1)).persistFullPathsImage( + fullSnapshot.getPathImage(), fullSnapshot.getId()); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(fullSnapshot.getId()); + + reset(sentryStore); + + // 4th run should not get a snapshot because is already processed and publish-subscribe + // trigger is only supposed to work once. This is exactly as 2nd run. + when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId()); + when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); + hmsFollower.run(); + verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong()); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); + + } + + @Test + public void testPersistAFullSnapshotWhenAuthzsnapshotIsEmptyAndHDFSSyncIsEnabled() throws Exception { + /* + * TEST CASE + * + * Simulates (by using mocks) the following: + * + * Disable HDFSSync before triggering a full snapshot + * + * HMS client always returns the paths image with the eventId == 1. + * + * On the 1st run: Sentry notification table is empty, so this + * should trigger a new full HMS snapshot request with the eventId = 1 + * but it should not persist it, in stead only set last + * last processed notification Id. This will prevent a + * unless until notifications are out of sync or hdfs sync is enabled + * + * On the 2nd run: Just enable hdfs sync and a full snapshot should be triggered + * because MAuthzPathsSnapshotId table is empty + * + */ + + configuration.set(ServiceConstants.ServerConfig.PROCESSOR_FACTORIES, ""); + configuration.set(ServiceConstants.ServerConfig.SENTRY_POLICY_STORE_PLUGINS, ""); + + final long SENTRY_PROCESSED_EVENT_ID = SentryStore.EMPTY_NOTIFICATION_ID; + final long HMS_PROCESSED_EVENT_ID = 1L; + + // Mock that returns a full snapshot + Map<String, Collection<String>> snapshotObjects = new HashMap<>(); + snapshotObjects.put("db", Sets.newHashSet("/db")); + snapshotObjects.put("db.table", Sets.newHashSet("/db/table")); + PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1); + + SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class); + when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot); + + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, + hmsConnectionMock, hiveInstance); + hmsFollower.setSentryHmsClient(sentryHmsClient); + + // 1st run should get a full snapshot because hms notificaions is empty + // but it should never be persisted because HDFS sync is disabled + when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); + when(sentryStore.isHmsNotificationEmpty()).thenReturn(true); + hmsFollower.run(); + verify(sentryStore, times(0)).persistFullPathsImage( + fullSnapshot.getPathImage(), fullSnapshot.getId()); + // Since hdfs sync is disabled we would set last processed notifications + // and since we did trigger createFullSnapshot() method we won't process any notifications + verify(sentryStore, times(1)).setLastProcessedNotificationID(fullSnapshot.getId()); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(fullSnapshot.getId()); + + reset(sentryStore); + + //Re-enable HDFS Sync and simply start the HMS follower thread, full snap shot + // should be triggered because MAuthzPathsSnapshotId table is empty + configuration.set(ServiceConstants.ServerConfig.PROCESSOR_FACTORIES, "org.apache.sentry.hdfs.SentryHDFSServiceProcessorFactory"); + configuration.set(ServiceConstants.ServerConfig.SENTRY_POLICY_STORE_PLUGINS, "org.apache.sentry.hdfs.SentryPlugin"); + + //Create a new hmsFollower instance since configuration is changing + hmsFollower = new HMSFollower(configuration, sentryStore, null, + hmsConnectionMock, hiveInstance); + hmsFollower.setSentryHmsClient(sentryHmsClient); + + + //Set last processed notification Id to match the full new value 1L + final long LATEST_EVENT_ID = 1L; + when(sentryStore.getLastProcessedNotificationID()).thenReturn(LATEST_EVENT_ID); + //Mock that sets isHmsNotificationEmpty to false + when(sentryStore.isHmsNotificationEmpty()).thenReturn(false); + // Mock that sets the current HMS notification ID. Set it to match + // last processed notification Id so that doesn't trigger a full snapshot + when(hmsClientMock.getCurrentNotificationEventId()) + .thenReturn(new CurrentNotificationEventId(LATEST_EVENT_ID)); + //Mock that sets getting next notification eve + when(hmsClientMock.getNextNotification(Mockito.eq(HMS_PROCESSED_EVENT_ID - 1), Mockito.eq(Integer.MAX_VALUE), + (NotificationFilter) Mockito.notNull())) + .thenReturn(new NotificationEventResponse( + Arrays.<NotificationEvent>asList( + new NotificationEvent(LATEST_EVENT_ID, 0, "", "") + ) + )); + //Mock that sets isAuthzPathsSnapshotEmpty to true so trigger this particular test + when(sentryStore.isAuthzPathsSnapshotEmpty()).thenReturn(true); + + hmsFollower.run(); + verify(sentryStore, times(1)).persistFullPathsImage( + fullSnapshot.getPathImage(), fullSnapshot.getId()); + verify(sentryStore, times(0)).setLastProcessedNotificationID(fullSnapshot.getId()); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(fullSnapshot.getId()); + + reset(sentryStore); + } + + @Test + public void testPersistAFullSnapshotWhenLastHmsNotificationIsLowerThanLastProcessed() + throws Exception { + /* + * TEST CASE + * + * Simulates (by using mocks) that Sentry already processed (and persisted) a notification + * with Id = 5, but the latest notification processed by the HMS is eventId = 1. So, an + * out-of-sync issue is happening on Sentry and HMS. This out-of-sync issue should trigger + * a new full HMS snapshot request with the same eventId = 1; + */ + + final long SENTRY_PROCESSED_EVENT_ID = 5L; + final long HMS_PROCESSED_EVENT_ID = 1L; + + // Mock that returns a full snapshot + Map<String, Collection<String>> snapshotObjects = new HashMap<>(); + snapshotObjects.put("db", Sets.newHashSet("/db")); + snapshotObjects.put("db.table", Sets.newHashSet("/db/table")); + PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1); + + // Mock that returns the current HMS notification ID + when(hmsClientMock.getCurrentNotificationEventId()) + .thenReturn(new CurrentNotificationEventId(fullSnapshot.getId())); + + SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class); + when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot); + + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, + hmsConnectionMock, hiveInstance); + hmsFollower.setSentryHmsClient(sentryHmsClient); + + // 1st run should get a full snapshot + when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); + when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); + hmsFollower.run(); + verify(sentryStore, times(1)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong()); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); + + reset(sentryStore); + + // 2nd run should not get a snapshot because is already processed + when(sentryStore.getLastProcessedNotificationID()).thenReturn(HMS_PROCESSED_EVENT_ID); + when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); + hmsFollower.run(); + verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong()); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); + } + + @Test + public void testPersistAFullSnapshotWhenNextExpectedEventIsNotAvailable() throws Exception { + /* + * TEST CASE + * + * Simulates (by using mocks) that Sentry already processed (and persisted) a notification + * with Id = 1, and the latest notification processed by the HMS is eventId = 5. So, new + * notifications should be fetched. + * + * The number of expected notifications should be 4, but we simulate that we fetch only one + * notification with eventId = 5 causing an out-of-sync because the expected notificatoin + * should be 2. This out-of-sync should trigger a new full HMS snapshot request with the + * same eventId = 5. + */ + + final long SENTRY_PROCESSED_EVENT_ID = 1L; + final long HMS_PROCESSED_EVENT_ID = 5L; + + // Mock that returns a full snapshot + Map<String, Collection<String>> snapshotObjects = new HashMap<>(); + snapshotObjects.put("db", Sets.newHashSet("/db")); + snapshotObjects.put("db.table", Sets.newHashSet("/db/table")); + PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1); + + // Mock that returns the current HMS notification ID + when(hmsClientMock.getCurrentNotificationEventId()) + .thenReturn(new CurrentNotificationEventId(fullSnapshot.getId())); + + SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class); + when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot); + + when(hmsClientMock.getNextNotification(Mockito.eq(SENTRY_PROCESSED_EVENT_ID - 1), Mockito.eq(Integer.MAX_VALUE), + (NotificationFilter) Mockito.notNull())) + .thenReturn(new NotificationEventResponse( + Arrays.<NotificationEvent>asList( + new NotificationEvent(fullSnapshot.getId(), 0, "", "") + ) + )); + + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, + hmsConnectionMock, hiveInstance); + hmsFollower.setSentryHmsClient(sentryHmsClient); + + // 1st run should get a full snapshot + when(sentryStore.getLastProcessedNotificationID()) + .thenReturn(SENTRY_PROCESSED_EVENT_ID); + when(sentryStore.isHmsNotificationEmpty()).thenReturn(false); + hmsFollower.run(); + verify(sentryStore, times(1)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong()); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); + + reset(sentryStore); + + // 2nd run should not get a snapshot because is already processed + when(sentryStore.getLastProcessedNotificationID()).thenReturn(HMS_PROCESSED_EVENT_ID); + when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); + hmsFollower.run(); + verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong()); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); + } + + /** + * Test that HMSFollower uses the input authentication server name when it is not null + */ + @Test + public void testInputConfigurationGetInputAuthServerName() { + Configuration sentryConfiguration = new Configuration(); + HMSFollower hmsFollower = new HMSFollower(sentryConfiguration, sentryStore, null, + hiveConnectionFactory, hiveInstance); + String authServerName = hmsFollower.getAuthServerName(); + + Assert.assertEquals(true, authServerName.equals(hiveInstance)); + } + + /** + * Test that HMSFollower uses the default authentication server name when its constructor input + * value is null and the configuration does not configure AUTHZ_SERVER_NAME nor + * AUTHZ_SERVER_NAME_DEPRECATED + */ + @Test + public void testNoConfigurationGetDefaultAuthServerName() { + Configuration sentryConfiguration = new Configuration(); + HMSFollower hmsFollower = new HMSFollower(sentryConfiguration, sentryStore, null, + hiveConnectionFactory, null); + String authServerName = hmsFollower.getAuthServerName(); + + Assert.assertEquals(true, authServerName.equals(AuthzConfVars.AUTHZ_SERVER_NAME_DEPRECATED.getDefault())); + } + + /** + * Test that HMSFollower uses the configured authentication server name when its constructor input + * value is null and the configuration contains configuration for AUTHZ_SERVER_NAME + */ + @Test + public void testNewNameConfigurationGetAuthServerName() { + String serverName = "newServer"; + Configuration sentryConfiguration = new Configuration(); + sentryConfiguration.set(HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME.getVar(), serverName); + HMSFollower hmsFollower = new HMSFollower(sentryConfiguration, sentryStore, null, + hiveConnectionFactory, null); + String authServerName = hmsFollower.getAuthServerName(); + + Assert.assertEquals(true, authServerName.equals(serverName)); + } + + /** + * Test that HMSFollower uses the configured deprecated authentication server name when its constructor input + * value is null and the configuration contains configuration for AUTHZ_SERVER_NAME_DEPRECATED + */ + @Test + public void testOldNameConfigurationGetAuthServerName() { + String serverName = "oldServer"; + Configuration sentryConfiguration = new Configuration(); + sentryConfiguration.set(AuthzConfVars.AUTHZ_SERVER_NAME_DEPRECATED.getVar(), serverName); + HMSFollower hmsFollower = new HMSFollower(sentryConfiguration, sentryStore, null, + hiveConnectionFactory, null); + String authServerName = hmsFollower.getAuthServerName(); + + Assert.assertEquals(true, authServerName.equals(serverName)); + } + + /** + * Constructs create database event and makes sure that appropriate sentry store API's + * are invoke when the event is processed by hms follower. + * + * @throws Exception + */ + @Test + public void testCreateDatabase() throws Exception { + String dbName = "db1"; + + // Create notification events + NotificationEvent notificationEvent = new NotificationEvent(1, 0, + EventMessage.EventType.CREATE_DATABASE.toString(), + messageFactory.buildCreateDatabaseMessage(new Database(dbName, null, "hdfs:///db1", null)) + .toString()); + List<NotificationEvent> events = new ArrayList<>(); + events.add(notificationEvent); + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, + hiveConnectionFactory, hiveInstance); + hmsFollower.processNotifications(events); + + TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + authorizable.setDb("db1"); + + verify(sentryStore, times(1)) + .dropPrivilege(authorizable, NotificationProcessor.getPermUpdatableOnDrop(authorizable)); + } + + /** + * Constructs drop database event and makes sure that appropriate sentry store API's + * are invoke when the event is processed by hms follower. + * + * @throws Exception + */ + @Test + public void testDropDatabase() throws Exception { + String dbName = "db1"; + + // Create notification events + NotificationEvent notificationEvent = new NotificationEvent(1, 0, + EventMessage.EventType.DROP_DATABASE.toString(), + messageFactory.buildDropDatabaseMessage(new Database(dbName, null, "hdfs:///db1", null)) + .toString()); + List<NotificationEvent> events = new ArrayList<>(); + events.add(notificationEvent); + + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, + hiveConnectionFactory, hiveInstance); + hmsFollower.processNotifications(events); + + TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + authorizable.setDb("db1"); + + verify(sentryStore, times(1)) + .dropPrivilege(authorizable, NotificationProcessor.getPermUpdatableOnDrop(authorizable)); + } + + /** + * Constructs create table event and makes sure that appropriate sentry store API's + * are invoke when the event is processed by hms follower. + * + * @throws Exception + */ + @Test + public void testCreateTable() throws Exception { + String dbName = "db1"; + String tableName = "table1"; + + // Create notification events + StorageDescriptor sd = new StorageDescriptor(); + sd.setLocation("hdfs:///db1.db/table1"); + NotificationEvent notificationEvent = new NotificationEvent(1, 0, + EventMessage.EventType.CREATE_TABLE.toString(), + messageFactory.buildCreateTableMessage( + new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null), + Collections.emptyIterator()).toString()); + List<NotificationEvent> events = new ArrayList<>(); + events.add(notificationEvent); + + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, + hiveConnectionFactory, hiveInstance); + hmsFollower.processNotifications(events); + + TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + authorizable.setDb("db1"); + authorizable.setTable(tableName); + + verify(sentryStore, times(1)) + .dropPrivilege(authorizable, NotificationProcessor.getPermUpdatableOnDrop(authorizable)); + } + + /** + * Constructs drop table event and makes sure that appropriate sentry store API's + * are invoke when the event is processed by hms follower. + * + * @throws Exception + */ + @Test + public void testDropTable() throws Exception { + String dbName = "db1"; + String tableName = "table1"; + + // Create notification events + StorageDescriptor sd = new StorageDescriptor(); + sd.setLocation("hdfs:///db1.db/table1"); + NotificationEvent notificationEvent = new NotificationEvent(1, 0, + EventMessage.EventType.DROP_TABLE.toString(), + messageFactory.buildDropTableMessage( + new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null)) + .toString()); + List<NotificationEvent> events = new ArrayList<>(); + events.add(notificationEvent); + + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, + hiveConnectionFactory, hiveInstance); + hmsFollower.processNotifications(events); + + TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + authorizable.setDb("db1"); + authorizable.setTable(tableName); + + verify(sentryStore, times(1)) + .dropPrivilege(authorizable, NotificationProcessor.getPermUpdatableOnDrop(authorizable)); + } + + /** + * Constructs rename table event and makes sure that appropriate sentry store API's + * are invoke when the event is processed by hms follower. + * + * @throws Exception + */ + @Test + public void testRenameTable() throws Exception { + String dbName = "db1"; + String tableName = "table1"; + + String newDbName = "db1"; + String newTableName = "table2"; + + // Create notification events + StorageDescriptor sd = new StorageDescriptor(); + sd.setLocation("hdfs:///db1.db/table1"); + NotificationEvent notificationEvent = new NotificationEvent(1, 0, + EventMessage.EventType.ALTER_TABLE.toString(), + messageFactory.buildAlterTableMessage( + new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null), + new Table(newTableName, newDbName, null, 0, 0, 0, sd, null, null, null, null, null)) + .toString()); + notificationEvent.setDbName(newDbName); + notificationEvent.setTableName(newTableName); + List<NotificationEvent> events = new ArrayList<>(); + events.add(notificationEvent); + + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, + hiveConnectionFactory, hiveInstance); + hmsFollower.processNotifications(events); + + TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + authorizable.setDb(dbName); + authorizable.setTable(tableName); + + TSentryAuthorizable newAuthorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + newAuthorizable.setDb(newDbName); + newAuthorizable.setTable(newTableName); + + verify(sentryStore, times(1)).renamePrivilege(authorizable, newAuthorizable, + NotificationProcessor.getPermUpdatableOnRename(authorizable, newAuthorizable)); + } + + + @Ignore + /** + * Constructs a bunch of events and passed to processor of hms follower. One of those is alter + * partition event with out actually changing anything(invalid event). Idea is to make sure that + * hms follower calls appropriate sentry store API's for the events processed by hms follower + * after processing the invalid alter partition event. + * + * @throws Exception + */ + @Test + public void testAlterPartitionWithInvalidEvent() throws Exception { + String dbName = "db1"; + String tableName1 = "table1"; + String tableName2 = "table2"; + long inputEventId = 1; + List<NotificationEvent> events = new ArrayList<>(); + NotificationEvent notificationEvent; + List<FieldSchema> partCols; + StorageDescriptor sd; + Mockito.doNothing().when(sentryStore).persistLastProcessedNotificationID(Mockito.anyLong()); + //noinspection unchecked + Mockito.doNothing().when(sentryStore).addAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(UniquePathsUpdate.class)); + + Configuration configuration = new Configuration(); + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, + hiveConnectionFactory, hiveInstance); + // Create a table + sd = new StorageDescriptor(); + sd.setLocation("hdfs://db1.db/table1"); + partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName1, dbName, null, 0, 0, 0, sd, partCols, null, null, null, + null); + notificationEvent = new NotificationEvent(inputEventId, 0, + EventMessage.EventType.CREATE_TABLE.toString(), + messageFactory.buildCreateTableMessage(table, Collections.emptyIterator()).toString()); + notificationEvent.setDbName(dbName); + notificationEvent.setTableName(tableName1); + events.add(notificationEvent); + inputEventId += 1; + // Process the notification + hmsFollower.processNotifications(events); + // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification + // and persistLastProcessedNotificationID was not invoked. + //noinspection unchecked + verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(UniquePathsUpdate.class)); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); + reset(sentryStore); + events.clear(); + + // Create a partition + List<Partition> partitions = new ArrayList<>(); + StorageDescriptor invalidSd = new StorageDescriptor(); + invalidSd.setLocation(null); + Partition partition = new Partition(Collections.singletonList("today"), dbName, tableName1, + 0, 0, sd, null); + partitions.add(partition); + notificationEvent = new NotificationEvent(inputEventId, 0, EventType.ADD_PARTITION.toString(), + messageFactory.buildAddPartitionMessage(table, partitions.iterator(), Collections.emptyIterator()).toString()); + notificationEvent.setDbName(dbName); + notificationEvent.setTableName(tableName1); + events.add(notificationEvent); + inputEventId += 1; + //Process the notification + hmsFollower.processNotifications(events); + // Make sure that addAuthzPathsMapping was invoked once to handle ADD_PARTITION notification + // and persistLastProcessedNotificationID was not invoked. + //noinspection unchecked + verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(UniquePathsUpdate.class)); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); + reset(sentryStore); + events.clear(); + + // Create a alter notification with out actually changing anything. + // This is an invalid event and should be processed by sentry store. + // Event Id should be explicitly persisted using persistLastProcessedNotificationID + notificationEvent = new NotificationEvent(inputEventId, 0, EventType.ALTER_PARTITION.toString(), + messageFactory.buildAlterPartitionMessage(table, partition, partition).toString()); + notificationEvent.setDbName(dbName); + notificationEvent.setTableName(tableName1); + events.add(notificationEvent); + inputEventId += 1; + // Process the notification + hmsFollower.processNotifications(events); + // Make sure that persistLastProcessedNotificationID is invoked explicitly. + verify(sentryStore, times(1)).persistLastProcessedNotificationID(inputEventId - 1); + reset(sentryStore); + events.clear(); + + // Create a alter notification with some actual change. + sd = new StorageDescriptor(); + sd.setLocation("hdfs://user/hive/warehouse/db1.db/table1"); + Partition updatedPartition = new Partition(partition); + updatedPartition.setSd(sd); + notificationEvent = new NotificationEvent(inputEventId, 0, EventType.ALTER_PARTITION.toString(), + messageFactory.buildAlterPartitionMessage(table, partition, updatedPartition).toString()); + notificationEvent.setDbName(dbName); + notificationEvent.setTableName(tableName1); + events.add(notificationEvent); + inputEventId += 1; + // Process the notification + hmsFollower.processNotifications(events); + // Make sure that updateAuthzPathsMapping was invoked once to handle ALTER_PARTITION + // notification and persistLastProcessedNotificationID was not invoked. + verify(sentryStore, times(1)).updateAuthzPathsMapping(Mockito.anyString(), + Mockito.anyString(), Mockito.anyString(), Mockito.any(UniquePathsUpdate.class)); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(inputEventId - 1); + reset(sentryStore); + events.clear(); + + // Create a table + sd = new StorageDescriptor(); + sd.setLocation("hdfs://db1.db/table2"); + partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table1 = new Table(tableName2, dbName, null, 0, 0, 0, sd, partCols, null, null, null, + null); + notificationEvent = new NotificationEvent(inputEventId, 0, + EventMessage.EventType.CREATE_TABLE.toString(), + messageFactory.buildCreateTableMessage(table1, Collections.emptyIterator()).toString()); + notificationEvent.setDbName(dbName); + notificationEvent.setTableName(tableName2); + events.add(notificationEvent); + // Process the notification + hmsFollower.processNotifications(events); + // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification + // and persistLastProcessedNotificationID was not invoked. + //noinspection unchecked + verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(UniquePathsUpdate.class)); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); + } + + /** + * Constructs a bunch of events and passed to processor of hms follower. One of those is alter + * table event with out actually changing anything(invalid event). Idea is to make sure that + * hms follower calls appropriate sentry store API's for the events processed by hms follower + * after processing the invalid alter table event. + * + * @throws Exception + */ + @Test + public void testAlterTableWithInvalidEvent() throws Exception { + String dbName = "db1"; + String tableName1 = "table1"; + String tableName2 = "table2"; + long inputEventId = 1; + List<NotificationEvent> events = new ArrayList<>(); + NotificationEvent notificationEvent; + List<FieldSchema> partCols; + StorageDescriptor sd; + Mockito.doNothing().when(sentryStore).persistLastProcessedNotificationID(Mockito.anyLong()); + //noinspection unchecked + Mockito.doNothing().when(sentryStore).addAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(UniquePathsUpdate.class)); + + Configuration configuration = new Configuration(); + // enable HDFS sync, so perm and path changes will be saved into DB + configuration.set(ServiceConstants.ServerConfig.PROCESSOR_FACTORIES, "org.apache.sentry.hdfs.SentryHDFSServiceProcessorFactory"); + configuration.set(ServiceConstants.ServerConfig.SENTRY_POLICY_STORE_PLUGINS, "org.apache.sentry.hdfs.SentryPlugin"); + + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, + hiveConnectionFactory, hiveInstance); + + // Create a table + sd = new StorageDescriptor(); + sd.setLocation("hdfs://db1.db/table1"); + partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName1, dbName, null, 0, 0, 0, sd, partCols, null, null, null, + null); + notificationEvent = new NotificationEvent(inputEventId, 0, + EventMessage.EventType.CREATE_TABLE.toString(), + messageFactory.buildCreateTableMessage(table, Collections.emptyIterator()).toString()); + notificationEvent.setDbName(dbName); + notificationEvent.setTableName(tableName1); + events.add(notificationEvent); + inputEventId += 1; + // Process the notification + hmsFollower.processNotifications(events); + // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification + // and persistLastProcessedNotificationID was not invoked. + //noinspection unchecked + verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(UniquePathsUpdate.class)); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); + reset(sentryStore); + events.clear(); + + // Create alter table notification with out actually changing anything. + // This notification should not be processed by sentry server + // Notification should be persisted explicitly + notificationEvent = new NotificationEvent(1, 0, + EventMessage.EventType.ALTER_TABLE.toString(), + messageFactory.buildAlterTableMessage( + new Table(tableName1, dbName, null, 0, 0, 0, sd, null, null, null, null, null), + new Table(tableName1, dbName, null, 0, 0, 0, sd, null, null, null, null, null)) + .toString()); + notificationEvent.setDbName(dbName); + notificationEvent.setTableName(tableName1); + events = new ArrayList<>(); + events.add(notificationEvent); + inputEventId += 1; + // Process the notification + hmsFollower.processNotifications(events); + // Make sure that renameAuthzObj and deleteAuthzPathsMapping were not invoked + // to handle CREATE_TABLE notification + // and persistLastProcessedNotificationID is explicitly invoked + verify(sentryStore, times(0)).renameAuthzObj(Mockito.anyString(), Mockito.anyString(), + Mockito.any(UniquePathsUpdate.class)); + //noinspection unchecked + verify(sentryStore, times(0)).deleteAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(UniquePathsUpdate.class)); + verify(sentryStore, times(1)).persistLastProcessedNotificationID(Mockito.anyLong()); + reset(sentryStore); + events.clear(); + + // Create a table + sd = new StorageDescriptor(); + sd.setLocation("hdfs://db1.db/table2"); + partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table1 = new Table(tableName2, dbName, null, 0, 0, 0, sd, partCols, null, null, null, + null); + notificationEvent = new NotificationEvent(inputEventId, 0, + EventMessage.EventType.CREATE_TABLE.toString(), + messageFactory.buildCreateTableMessage(table1, Collections.emptyIterator()).toString()); + notificationEvent.setDbName(dbName); + notificationEvent.setTableName(tableName2); + events.add(notificationEvent); + // Process the notification + hmsFollower.processNotifications(events); + // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification + // and persistLastProcessedNotificationID was not invoked. + //noinspection unchecked + verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(UniquePathsUpdate.class)); + verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); + } + + /** + * Constructs a two events and passed to processor of hms follower. First one being create table + * event with location information(Invalid Event). Idea is to make sure that hms follower calls + * appropriate sentry store API's for the event processed by hms follower after processing the + * invalid create table event. + * + * @throws Exception + */ + @Test + public void testCreateTableAfterInvalidEvent() throws Exception { + String dbName = "db1"; + String tableName = "table1"; + long inputEventId = 1; + + Mockito.doNothing().when(sentryStore).persistLastProcessedNotificationID(Mockito.anyLong()); + //noinspection unchecked + Mockito.doNothing().when(sentryStore) + .addAuthzPathsMapping(Mockito.anyString(), Mockito.anyCollection(), + Mockito.any(UniquePathsUpdate.class)); + + // Create invalid notification event. The location of the storage descriptor is null, which is invalid for creating table + StorageDescriptor invalidSd = new StorageDescriptor(); + invalidSd.setLocation(null); + NotificationEvent invalidNotificationEvent = new NotificationEvent(inputEventId, 0, + EventMessage.EventType.CREATE_TABLE.toString(), + messageFactory.buildCreateTableMessage( + new Table(tableName, dbName, null, 0, 0, 0, invalidSd, null, null, null, null, null), + Collections.emptyIterator()).toString()); + + // Create valid notification event + StorageDescriptor sd = new StorageDescriptor(); + sd.setLocation("hdfs://db1.db/table1"); + inputEventId += 1; + NotificationEvent notificationEvent = new NotificationEvent(inputEventId, 0, + EventMessage.EventType.CREATE_TABLE.toString(), + messageFactory.buildCreateTableMessage( + new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null), + Collections.emptyIterator()).toString()); + List<NotificationEvent> events = new ArrayList<>(); + events.add(invalidNotificationEvent); + events.add(notificationEvent); + + Configuration configuration = new Configuration(); + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, + hiveConnectionFactory, hiveInstance); + hmsFollower.processNotifications(events); + + // invalid event updates notification ID directly + verify(sentryStore, times(1)).persistLastProcessedNotificationID(inputEventId - 1); + + // next valid event update path, which updates notification ID + //noinspection unchecked + verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), Mockito.anyCollection(), + Mockito.any(UniquePathsUpdate.class)); + } + + @Test + public void testNoHdfsNoPersistAFullSnapshot() throws Exception { + /* + * TEST CASE + * + * Simulates (by using mocks) that Sentry has not processed any notifications, so this + * should trigger a new full HMS snapshot request with the eventId = 1 + */ + + final long SENTRY_PROCESSED_EVENT_ID = SentryStore.EMPTY_NOTIFICATION_ID; + final long HMS_PROCESSED_EVENT_ID = 1L; + + // Mock that returns a full snapshot + Map<String, Collection<String>> snapshotObjects = new HashMap<>(); + snapshotObjects.put("db", Sets.newHashSet("/db")); + snapshotObjects.put("db.table", Sets.newHashSet("/db/table")); + PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1); + + // Mock that returns the current HMS notification ID + when(hmsClientMock.getCurrentNotificationEventId()) + .thenReturn(new CurrentNotificationEventId(fullSnapshot.getId())); + + SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class); + when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot); + + Configuration configuration = new Configuration(); + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, + hmsConnectionMock, hiveInstance); + hmsFollower.setSentryHmsClient(sentryHmsClient); + + // 1st run should get a full snapshot because AuthzPathsMapping is empty + when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); + when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); + when(sentryStore.isHmsNotificationEmpty()).thenReturn(true); + hmsFollower.run(); + verify(sentryStore, times(0)).persistFullPathsImage(fullSnapshot.getPathImage(), fullSnapshot.getId()); + verify(sentryStore, times(1)).setLastProcessedNotificationID(fullSnapshot.getId()); + verify(sentryStore, times(1)).isHmsNotificationEmpty(); + verify(sentryStore, times(0)).isAuthzPathsMappingEmpty(); + } + + @Test + public void testNoHdfsSyncAlterTableNotPersisted() throws Exception { + String dbName = "db1"; + String tableName = "table1"; + String newDbName = "db1"; + String newTableName = "table2"; + + // Create notification events + StorageDescriptor sd = new StorageDescriptor(); + sd.setLocation("hdfs:///db1.db/table1"); + NotificationEvent notificationEvent = new NotificationEvent(1, 0, + EventMessage.EventType.ALTER_TABLE.toString(), + messageFactory.buildAlterTableMessage( + new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null), + new Table(newTableName, newDbName, null, 0, 0, 0, sd, null, null, null, null, null)) + .toString()); + notificationEvent.setDbName(newDbName); + notificationEvent.setTableName(newTableName); + List<NotificationEvent> events = new ArrayList<>(); + events.add(notificationEvent); + + Configuration configuration = new Configuration(); + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, + hiveConnectionFactory, hiveInstance); + hmsFollower.processNotifications(events); + + TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + authorizable.setDb(dbName); + authorizable.setTable(tableName); + + TSentryAuthorizable newAuthorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + newAuthorizable.setDb(newDbName); + newAuthorizable.setTable(newTableName); + + verify(sentryStore, times(0)).renamePrivilege(authorizable, newAuthorizable, + NotificationProcessor.getPermUpdatableOnRename(authorizable, newAuthorizable)); + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/c4226f64/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollowerSentryStoreIntegration.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollowerSentryStoreIntegration.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollowerSentryStoreIntegration.java index 501898b..91c90f9 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollowerSentryStoreIntegration.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollowerSentryStoreIntegration.java @@ -36,7 +36,6 @@ import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageFacto import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege; import org.apache.sentry.service.thrift.HiveSimpleConnectionFactory; import org.apache.sentry.provider.file.PolicyFile; -import org.apache.sentry.service.thrift.HMSFollower; import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig; import org.junit.After; import org.junit.AfterClass; http://git-wip-us.apache.org/repos/asf/sentry/blob/c4226f64/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestLeaderStatusMonitor.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestLeaderStatusMonitor.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestLeaderStatusMonitor.java new file mode 100644 index 0000000..395516c --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestLeaderStatusMonitor.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.provider.db.service.persistent; + +import org.apache.curator.test.TestingServer; +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.lang.Thread.sleep; +import static org.apache.sentry.service.thrift.ServiceConstants.ServerConfig.SENTRY_HA_ZOOKEEPER_QUORUM; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Unit Tests for LeaderStatusMonitor. + * Use Curator TestingServer as Zookeeper Server. + */ +@SuppressWarnings("NestedTryStatement") +public class TestLeaderStatusMonitor { + private static final Logger LOGGER = LoggerFactory.getLogger(TestLeaderStatusMonitor.class); + + // Delay between retries + private static final int DELAY_MS = 500; + // Maximum number of tries before giving up while waiting for leader + private static final int NTRIES = 360; + // Number of times test is repeated + private static final int ITERATIONS = 10; + + /** + * Wait for some time (u to 500 seconds) until the monitor becomes active + * @param monitor HA monitor + * @return true if monitor is active, false otherwise + */ + @SuppressWarnings("squid:S2925") + private boolean isLeader(LeaderStatusMonitor monitor) { + for (int i = 0; i < NTRIES; i++) { + if (monitor.isLeader()) { + return true; + } + try { + sleep(DELAY_MS); + } catch (InterruptedException ignored) { + Thread.interrupted(); + } + } + return false; + } + + /** + * Simple test case - leader monitor without Zookeeper. + * Should always be the leader. + * @throws Exception + */ + @Test + public void testNoZk() throws Exception { + Configuration conf = new Configuration(); + LeaderStatusMonitor monitor = new LeaderStatusMonitor(conf); + assertTrue(monitor.isLeader()); + } + + /** + * Single server scenario. + * Should always be the leader. + * Should continue to be the leader after resigning the leadership. + * + * <p> + * <ol> + * <li>Start ZK Server</li> + * <li>Create monitor</li> + * <li>Monitor should become active</li> + * <li>Drop active status</li> + * <li>Monitor should become active again</li> + * </ol> + * @throws Exception + */ + @Test + @SuppressWarnings("squid:S2925") + public void testSingleServer() throws Exception { + try(TestingServer zkServer = new TestingServer()) { + zkServer.start(); + Configuration conf = new Configuration(); + conf.set(SENTRY_HA_ZOOKEEPER_QUORUM, zkServer.getConnectString()); + try(LeaderStatusMonitor monitor = new LeaderStatusMonitor(conf)) { + monitor.init(); + for (int i = 0; i < ITERATIONS; i++) { + assertTrue(isLeader(monitor)); + LOGGER.debug("testSingleServer(): deactivating leader"); + monitor.deactivate(); + sleep(2 * DELAY_MS); + assertTrue(isLeader(monitor)); + LOGGER.info("testSingleServer({}, leaderCount = {}", i, monitor.getLeaderCount()); + } + assertEquals(ITERATIONS + 1, monitor.getLeaderCount()); + } + } finally { + HAContext.resetHAContext(); + } + } + + /** + * Single server scenario with restarting ZK server + * <p> + * <ol> + * <li>Start ZK Server</li> + * <li>Create monitor</li> + * <li>at some point monitor should become active</li> + * <li>Restart ZK server</li> + * <li>at some point monitor should become active again</li> + * </ol> + * @throws Exception + */ + @Test + public void testSingleServerZkRestart() throws Exception { + try(TestingServer zkServer = new TestingServer()) { + zkServer.start(); + Configuration conf = new Configuration(); + conf.set(SENTRY_HA_ZOOKEEPER_QUORUM, zkServer.getConnectString()); + try(LeaderStatusMonitor monitor = new LeaderStatusMonitor(conf)) { + monitor.init(); + for (int i = 0; i < ITERATIONS; i++) { + assertTrue(isLeader(monitor)); + LOGGER.debug("testSingleServerZkRestart(): restarting Zk server"); + zkServer.restart(); + assertTrue(isLeader(monitor)); + LOGGER.info("testSingleServerZkRestart({}, leaderCount = {}", i, monitor.getLeaderCount()); + assertEquals(i + 2, monitor.getLeaderCount()); + } + } + } finally { + HAContext.resetHAContext(); + } + } + + /** + * Dual server scenario + * <p> + * <ol> + * <li>Start ZK Server</li> + * <li>Create monitor1 and monitor2</li> + * <li>at some point one of monitors should become active</li> + * <li>Drop active status on monitor 2</li> + * <li>Monitor1 should become active</li> + * <li>Drop active status on monitor1</li> + * <li>Monitor2 should become active</li> + * </ol> + * @throws Exception + */ + @Test + @SuppressWarnings("squid:S2925") + public void testTwoServers() throws Exception { + try(TestingServer zkServer = new TestingServer()) { + zkServer.start(); + Configuration conf = new Configuration(); + conf.set(SENTRY_HA_ZOOKEEPER_QUORUM, zkServer.getConnectString()); + try (LeaderStatusMonitor monitor1 = new LeaderStatusMonitor(conf, "1"); + LeaderStatusMonitor monitor2 = new LeaderStatusMonitor(conf, "2")) { + monitor1.init(); + monitor2.init(); + // Wait until one of monitors is active + for (int i = 0; i < NTRIES; i++) { + if (monitor1.isLeader() || monitor2.isLeader()) { + break; + } + try { + sleep(DELAY_MS); + } catch (InterruptedException ignored) { + Thread.interrupted(); + } + } + + for (int i = 0; i < ITERATIONS; i++) { + monitor2.deactivate(); + assertTrue(isLeader(monitor1)); + monitor1.deactivate(); + assertTrue(isLeader(monitor2)); + } + } + } finally { + HAContext.resetHAContext(); + } + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/c4226f64/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestNotificationProcessor.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestNotificationProcessor.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestNotificationProcessor.java new file mode 100644 index 0000000..923faff --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestNotificationProcessor.java @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.provider.db.service.persistent; + +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.EventMessage; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageFactory; +import org.apache.sentry.hdfs.UniquePathsUpdate; +import org.apache.sentry.service.thrift.ServiceConstants; +import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +// TODO 1. More tests should be added here. +// TODO 2. Tests using actual sentry store where. +@SuppressWarnings("unused") +public class TestNotificationProcessor { + + private static final SentryStore sentryStore = Mockito.mock(SentryStore.class); + private final static String hiveInstance = "server2"; + private final static Configuration conf = new Configuration(); + private final SentryJSONMessageFactory messageFactory = new SentryJSONMessageFactory(); + private NotificationProcessor notificationProcessor; + + @BeforeClass + public static void setup() { + conf.set("sentry.hive.sync.create", "true"); + conf.set("sentry.hive.sync.drop", "true"); + + // enable HDFS sync, so perm and path changes will be saved into DB + conf.set(ServiceConstants.ServerConfig.PROCESSOR_FACTORIES, "org.apache.sentry.hdfs.SentryHDFSServiceProcessorFactory"); + conf.set(ServiceConstants.ServerConfig.SENTRY_POLICY_STORE_PLUGINS, "org.apache.sentry.hdfs.SentryPlugin"); + } + + @After + public void resetConf() { + conf.set("sentry.hive.sync.create", "true"); + conf.set("sentry.hive.sync.drop", "true"); + reset(sentryStore); + } + + @Test + /* + Makes sure that appropriate sentry store methods are invoked when create database event is + processed. + + Also, checks the hive sync configuration. + */ + public void testCreateDatabase() throws Exception { + long seqNum = 1; + String dbName = "db1"; + String uriPrefix = "hdfs:///"; + String location = "user/hive/warehouse"; + NotificationEvent notificationEvent; + TSentryAuthorizable authorizable; + notificationProcessor = new NotificationProcessor(sentryStore, + hiveInstance, conf); + + // Create notification event + notificationEvent = new NotificationEvent(seqNum, 0, + EventMessage.EventType.CREATE_DATABASE.toString(), + messageFactory.buildCreateDatabaseMessage(new Database(dbName, + null, uriPrefix + location, null)).toString()); + + notificationProcessor.processNotificationEvent(notificationEvent); + + authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + authorizable.setDb("db1"); + //noinspection unchecked + verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(UniquePathsUpdate.class)); + + verify(sentryStore, times(1)).dropPrivilege(authorizable, + NotificationProcessor.getPermUpdatableOnDrop(authorizable)); + reset(sentryStore); + + //Change the configuration and make sure that exiting privileges are not dropped + notificationProcessor.setSyncStoreOnCreate(false); + dbName = "db2"; + notificationEvent = new NotificationEvent(1, 0, + EventMessage.EventType.CREATE_DATABASE.toString(), + messageFactory.buildCreateDatabaseMessage(new Database(dbName, + null, "hdfs:///db2", null)).toString()); + + notificationProcessor.processNotificationEvent(notificationEvent); + + authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + authorizable.setDb(dbName); + + //noinspection unchecked + verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(UniquePathsUpdate.class)); + //making sure that privileges are not dropped + verify(sentryStore, times(0)).dropPrivilege(authorizable, + NotificationProcessor.getPermUpdatableOnDrop(authorizable)); + + } + + @Test + /* + Makes sure that appropriate sentry store methods are invoked when drop database event is + processed. + + Also, checks the hive sync configuration. + */ + public void testDropDatabase() throws Exception { + String dbName = "db1"; + + notificationProcessor = new NotificationProcessor(sentryStore, + hiveInstance, conf); + + // Create notification event + NotificationEvent notificationEvent = new NotificationEvent(1, 0, + EventMessage.EventType.DROP_DATABASE.toString(), + messageFactory.buildDropDatabaseMessage(new Database(dbName, null, + "hdfs:///db1", null)).toString()); + + notificationProcessor.processNotificationEvent(notificationEvent); + + TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + authorizable.setDb("db1"); + + //noinspection unchecked + verify(sentryStore, times(1)).deleteAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(UniquePathsUpdate.class)); + verify(sentryStore, times(1)).dropPrivilege(authorizable, + NotificationProcessor.getPermUpdatableOnDrop(authorizable)); + reset(sentryStore); + + // Change the configuration and make sure that exiting privileges are not dropped + notificationProcessor.setSyncStoreOnDrop(false); + dbName = "db2"; + // Create notification event + notificationEvent = new NotificationEvent(1, 0, + EventMessage.EventType.DROP_DATABASE.toString(), + messageFactory.buildDropDatabaseMessage(new Database(dbName, null, + "hdfs:///db2", null)).toString()); + + notificationProcessor.processNotificationEvent(notificationEvent); + + authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + authorizable.setDb(dbName); + + //noinspection unchecked + verify(sentryStore, times(1)).deleteAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(UniquePathsUpdate.class)); + verify(sentryStore, times(0)).dropPrivilege(authorizable, + NotificationProcessor.getPermUpdatableOnDrop(authorizable)); + } + + @Test + /* + Makes sure that appropriate sentry store methods are invoked when create table event is + processed. + + Also, checks the hive sync configuration. + */ + public void testCreateTable() throws Exception { + String dbName = "db1"; + String tableName = "table1"; + + notificationProcessor = new NotificationProcessor(sentryStore, + hiveInstance, conf); + + // Create notification event + StorageDescriptor sd = new StorageDescriptor(); + sd.setLocation("hdfs:///db1.db/table1"); + NotificationEvent notificationEvent = + new NotificationEvent(1, 0, EventMessage.EventType.CREATE_TABLE.toString(), + messageFactory.buildCreateTableMessage(new Table(tableName, + dbName, null, 0, 0, 0, sd, null, null, null, null, null), + Collections.emptyIterator()).toString()); + + notificationProcessor.processNotificationEvent(notificationEvent); + + TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + authorizable.setDb("db1"); + authorizable.setTable(tableName); + + //noinspection unchecked + verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(UniquePathsUpdate.class)); + + verify(sentryStore, times(1)).dropPrivilege(authorizable, + NotificationProcessor.getPermUpdatableOnDrop(authorizable)); + reset(sentryStore); + + // Change the configuration and make sure that existing privileges are not dropped + notificationProcessor.setSyncStoreOnCreate(false); + + // Create notification event + dbName = "db2"; + tableName = "table2"; + sd = new StorageDescriptor(); + sd.setLocation("hdfs:///db1.db/table2"); + notificationEvent = + new NotificationEvent(1, 0, EventMessage.EventType.CREATE_TABLE.toString(), + messageFactory.buildCreateTableMessage(new Table(tableName, + dbName, null, 0, 0, 0, sd, null, null, null, null, null), + Collections.emptyIterator()).toString()); + + notificationProcessor.processNotificationEvent(notificationEvent); + + authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + authorizable.setDb(dbName); + authorizable.setTable(tableName); + + //noinspection unchecked + verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(UniquePathsUpdate.class)); + // Making sure that privileges are not dropped + verify(sentryStore, times(0)).dropPrivilege(authorizable, + NotificationProcessor.getPermUpdatableOnDrop(authorizable)); + } + + @Test + /* + Makes sure that appropriate sentry store methods are invoked when drop table event is + processed. + + Also, checks the hive sync configuration. + */ + public void testDropTable() throws Exception { + String dbName = "db1"; + String tableName = "table1"; + + Configuration authConf = new Configuration(); + // enable HDFS sync, so perm and path changes will be saved into DB + authConf.set(ServiceConstants.ServerConfig.PROCESSOR_FACTORIES, "org.apache.sentry.hdfs.SentryHDFSServiceProcessorFactory"); + authConf.set(ServiceConstants.ServerConfig.SENTRY_POLICY_STORE_PLUGINS, "org.apache.sentry.hdfs.SentryPlugin"); + + notificationProcessor = new NotificationProcessor(sentryStore, + hiveInstance, authConf); + + // Create notification event + StorageDescriptor sd = new StorageDescriptor(); + sd.setLocation("hdfs:///db1.db/table1"); + NotificationEvent notificationEvent = new NotificationEvent(1, 0, + EventMessage.EventType.DROP_TABLE.toString(), + messageFactory.buildDropTableMessage(new Table(tableName, + dbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString()); + + notificationProcessor.processNotificationEvent(notificationEvent); + + TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + authorizable.setDb("db1"); + authorizable.setTable(tableName); + + verify(sentryStore, times(1)).deleteAllAuthzPathsMapping(Mockito.anyString(), + Mockito.any(UniquePathsUpdate.class)); + + verify(sentryStore, times(1)).dropPrivilege(authorizable, + NotificationProcessor.getPermUpdatableOnDrop(authorizable)); + } + + @Test + /* + Makes sure that appropriate sentry store methods are invoked when alter tables event is + processed. + */ + public void testAlterTable() throws Exception { + String dbName = "db1"; + String tableName = "table1"; + + String newDbName = "db1"; + String newTableName = "table2"; + + Configuration authConf = new Configuration(); + // enable HDFS sync, so perm and path changes will be saved into DB + authConf.set(ServiceConstants.ServerConfig.PROCESSOR_FACTORIES, "org.apache.sentry.hdfs.SentryHDFSServiceProcessorFactory"); + authConf.set(ServiceConstants.ServerConfig.SENTRY_POLICY_STORE_PLUGINS, "org.apache.sentry.hdfs.SentryPlugin"); + + notificationProcessor = new NotificationProcessor(sentryStore, + hiveInstance, authConf); + + // Create notification event + StorageDescriptor sd = new StorageDescriptor(); + sd.setLocation("hdfs:///db1.db/table1"); + NotificationEvent notificationEvent = new NotificationEvent(1, 0, + EventMessage.EventType.ALTER_TABLE.toString(), + messageFactory.buildAlterTableMessage( + new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null), + new Table(newTableName, newDbName, null, 0, 0, 0, sd, null, null, null, null, null)) + .toString()); + notificationEvent.setDbName(newDbName); + notificationEvent.setTableName(newTableName); + + notificationProcessor.processNotificationEvent(notificationEvent); + + TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + authorizable.setDb(dbName); + authorizable.setTable(tableName); + + TSentryAuthorizable newAuthorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + newAuthorizable.setDb(newDbName); + newAuthorizable.setTable(newTableName); + + verify(sentryStore, times(1)).renameAuthzObj(Mockito.anyString(), Mockito.anyString(), + Mockito.any(UniquePathsUpdate.class)); + + verify(sentryStore, times(1)).renamePrivilege(authorizable, newAuthorizable, + NotificationProcessor.getPermUpdatableOnRename(authorizable, newAuthorizable)); + } + + @Test + /* + Makes sure that appropriate sentry store methods are invoked when alter tables event is + processed. + */ + public void testRenameTableWithLocationUpdate() throws Exception { + String dbName = "db1"; + String tableName = "table1"; + + String newDbName = "db1"; + String newTableName = "table2"; + + Configuration authConf = new Configuration(); + // enable HDFS sync, so perm and path changes will be saved into DB + authConf.set(ServiceConstants.ServerConfig.PROCESSOR_FACTORIES, "org.apache.sentry.hdfs.SentryHDFSServiceProcessorFactory"); + authConf.set(ServiceConstants.ServerConfig.SENTRY_POLICY_STORE_PLUGINS, "org.apache.sentry.hdfs.SentryPlugin"); + + notificationProcessor = new NotificationProcessor(sentryStore, + hiveInstance, authConf); + + // Create notification event + StorageDescriptor sd = new StorageDescriptor(); + sd.setLocation("hdfs:///db1.db/table1"); + StorageDescriptor new_sd = new StorageDescriptor(); + new_sd.setLocation("hdfs:///db1.db/table2"); + NotificationEvent notificationEvent = new NotificationEvent(1, 0, + EventMessage.EventType.ALTER_TABLE.toString(), + messageFactory.buildAlterTableMessage( + new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null), + new Table(newTableName, newDbName, null, 0, 0, 0, new_sd, null, null, null, null, null)) + .toString()); + notificationEvent.setDbName(newDbName); + notificationEvent.setTableName(newTableName); + + notificationProcessor.processNotificationEvent(notificationEvent); + + TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + authorizable.setDb(dbName); + authorizable.setTable(tableName); + + TSentryAuthorizable newAuthorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + newAuthorizable.setDb(newDbName); + newAuthorizable.setTable(newTableName); + + verify(sentryStore, times(1)).renameAuthzPathsMapping(Mockito.anyString(), Mockito.anyString(), + Mockito.anyString(), Mockito.anyString(), Mockito.any(UniquePathsUpdate.class)); + + verify(sentryStore, times(1)).renamePrivilege(authorizable, newAuthorizable, + NotificationProcessor.getPermUpdatableOnRename(authorizable, newAuthorizable)); + } + + @Test + /* + Test to made sure that sentry store is not invoked when invalid alter table event is + processed. + */ + public void testAlterTableWithInvalidEvent() throws Exception { + String dbName = "db1"; + String tableName1 = "table1"; + String tableName2 = "table2"; + long inputEventId = 1; + NotificationEvent notificationEvent; + List<FieldSchema> partCols; + StorageDescriptor sd; + Mockito.doNothing().when(sentryStore).persistLastProcessedNotificationID(Mockito.anyLong()); + //noinspection unchecked + Mockito.doNothing().when(sentryStore).addAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(UniquePathsUpdate.class)); + + Configuration authConf = new Configuration(); + // enable HDFS sync, so perm and path changes will be saved into DB + authConf.set(ServiceConstants.ServerConfig.PROCESSOR_FACTORIES, "org.apache.sentry.hdfs.SentryHDFSServiceProcessorFactory"); + authConf.set(ServiceConstants.ServerConfig.SENTRY_POLICY_STORE_PLUGINS, "org.apache.sentry.hdfs.SentryPlugin"); + + notificationProcessor = new NotificationProcessor(sentryStore, + hiveInstance, authConf); + + // Create a table + sd = new StorageDescriptor(); + sd.setLocation("hdfs://db1.db/table1"); + partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName1, dbName, null, 0, 0, 0, sd, partCols, + null, null, null, null); + notificationEvent = new NotificationEvent(inputEventId, 0, + EventMessage.EventType.CREATE_TABLE.toString(), + messageFactory.buildCreateTableMessage(table, Collections.emptyIterator()).toString()); + notificationEvent.setDbName(dbName); + notificationEvent.setTableName(tableName1); + inputEventId += 1; + // Process the notification + notificationProcessor.processNotificationEvent(notificationEvent); + // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification + // and persistLastProcessedNotificationID was not invoked. + //noinspection unchecked + verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(UniquePathsUpdate.class)); + reset(sentryStore); + + // Create alter table notification with out actually changing anything. + // This notification should not be processed by sentry server + // Notification should be persisted explicitly + notificationEvent = new NotificationEvent(1, 0, + EventMessage.EventType.ALTER_TABLE.toString(), + messageFactory.buildAlterTableMessage( + new Table(tableName1, dbName, null, 0, 0, 0, sd, null, null, null, null, null), + new Table(tableName1, dbName, null, 0, 0, 0, sd, null, + null, null, null, null)).toString()); + notificationEvent.setDbName(dbName); + notificationEvent.setTableName(tableName1); + inputEventId += 1; + // Process the notification + notificationProcessor.processNotificationEvent(notificationEvent); + // Make sure that renameAuthzObj and deleteAuthzPathsMapping were not invoked + // to handle CREATE_TABLE notification + // and persistLastProcessedNotificationID is explicitly invoked + verify(sentryStore, times(0)).renameAuthzObj(Mockito.anyString(), Mockito.anyString(), + Mockito.any(UniquePathsUpdate.class)); + //noinspection unchecked + verify(sentryStore, times(0)).deleteAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(UniquePathsUpdate.class)); + reset(sentryStore); + + // Create a table + sd = new StorageDescriptor(); + sd.setLocation("hdfs://db1.db/table2"); + partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table1 = new Table(tableName2, dbName, null, 0, 0, 0, sd, + partCols, null, null, null, null); + notificationEvent = new NotificationEvent(inputEventId, 0, + EventMessage.EventType.CREATE_TABLE.toString(), + messageFactory.buildCreateTableMessage(table1, Collections.emptyIterator()).toString()); + notificationEvent.setDbName(dbName); + notificationEvent.setTableName(tableName2); + // Process the notification + notificationProcessor.processNotificationEvent(notificationEvent); + // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification + // and persistLastProcessedNotificationID was not invoked. + //noinspection unchecked + verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), + Mockito.anyCollection(), Mockito.any(UniquePathsUpdate.class)); + } +}
