Repository: sentry Updated Branches: refs/heads/master 5f64fe9f3 -> 372ffc9b4
SENTRY-1640: Implement HMS Notification barrier on the HMS plugin side (Sergio Pena, reviewed by kalyan kumar kalvagadda) Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/372ffc9b Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/372ffc9b Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/372ffc9b Branch: refs/heads/master Commit: 372ffc9b4c662b0d076940b7526d2fe58c8a6a09 Parents: 5f64fe9 Author: Sergio Pena <[email protected]> Authored: Sun Nov 19 15:12:39 2017 -0600 Committer: Sergio Pena <[email protected]> Committed: Sun Nov 19 15:12:39 2017 -0600 ---------------------------------------------------------------------- ...rySyncHMSNotificationsPostEventListener.java | 230 +++++++++++++++++++ ...rySyncHMSNotificationsPostEventListener.java | 161 +++++++++++++ .../thrift/SentryPolicyServiceClient.java | 9 + .../SentryPolicyServiceClientDefaultImpl.java | 13 ++ 4 files changed, 413 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/372ffc9b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentrySyncHMSNotificationsPostEventListener.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentrySyncHMSNotificationsPostEventListener.java b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentrySyncHMSNotificationsPostEventListener.java new file mode 100644 index 0000000..24d7763 --- /dev/null +++ b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentrySyncHMSNotificationsPostEventListener.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.binding.metastore; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreEventListener; +import org.apache.hadoop.hive.metastore.MetaStoreEventListenerConstants; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterTableEvent; +import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.CreateTableEvent; +import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; +import org.apache.hadoop.hive.metastore.events.DropTableEvent; +import org.apache.hadoop.hive.metastore.events.ListenerEvent; +import org.apache.sentry.binding.hive.conf.HiveAuthzConf; +import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient; +import org.apache.sentry.service.thrift.SentryServiceClientFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +/** + * This HMS post-event listener is used only to synchronize with HMS notifications on the Sentry server + * whenever a DDL event happens on the Hive metastore. + */ +public class SentrySyncHMSNotificationsPostEventListener extends MetaStoreEventListener { + private static final Logger LOGGER = LoggerFactory.getLogger(SentrySyncHMSNotificationsPostEventListener.class); + + private final HiveAuthzConf authzConf; + + /* + * Latest processed ID by the Sentry server. May only increase. + * + * This listener will track the latest event ID processed by the Sentry server so that it avoids calling + * the sync request in case a late thread attempts to synchronize again an already processed ID. + * + * The variable is shared across threads, so the AtomicLong variable guarantees that is increased + * monotonically. + */ + private final AtomicLong latestProcessedId = new AtomicLong(0); + + /* + * A client used for testing purposes only. I + * + * It may be set by unit-tests as a mock object and used to verify that the client methods + * were called correctly (see TestSentrySyncHMSNotificationsPostEventListener) + */ + private SentryPolicyServiceClient serviceClient; + + public SentrySyncHMSNotificationsPostEventListener(Configuration config) { + super(config); + + if (!(config instanceof HiveConf)) { + String error = "Could not initialize Plugin - Configuration is not an instanceof HiveConf"; + LOGGER.error(error); + throw new RuntimeException(error); + } + + authzConf = HiveAuthzConf.getAuthzConf((HiveConf)config); + } + + @Override + public void onCreateTable(CreateTableEvent tableEvent) throws MetaException { + syncNotificationEvents(tableEvent, "onCreateTable"); + } + + @Override + public void onDropTable(DropTableEvent tableEvent) throws MetaException { + syncNotificationEvents(tableEvent, "onDropTable"); + } + + @Override + public void onAlterTable(AlterTableEvent tableEvent) throws MetaException { + // no-op + } + + @Override + public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaException { + // no-op + } + + @Override + public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException { + // no-op + } + + @Override + public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaException { + // no-op + } + + @Override + public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException { + syncNotificationEvents(dbEvent, "onCreateDatabase"); + } + + @Override + public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException { + syncNotificationEvents(dbEvent, "onDropDatabase"); + } + + /** + * It requests the Sentry server the synchronization of recent notification events. + * + * After the sync call, the latest processed ID will be stored for future reference to avoid + * syncing an ID that was already processed. + * + * @param event An event that contains a DB_NOTIFICATION_EVENT_ID_KEY_NAME value to request. + */ + private void syncNotificationEvents(ListenerEvent event, String eventName) { + // Do not sync notifications if the event has failed. + if (failedEvent(event, eventName)) { + return; + } + + Map<String, String> eventParameters = event.getParameters(); + if (!eventParameters.containsKey(MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME)) { + return; + } + + /* If the HMS is running in an active transaction, then we do not want to sync with Sentry + * because the desired eventId is not available for Sentry yet, and Sentry may block the HMS + * forever or until a read time-out happens. */ + if (isMetastoreTransactionActive(eventParameters)) { + return; + } + + long eventId = + Long.parseLong(eventParameters.get(MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME)); + + // This check is only for performance reasons to avoid calling the sync thrift call if the Sentry server + // already processed the requested eventId. + if (eventId <= latestProcessedId.get()) { + return; + } + + try(SentryPolicyServiceClient sentryClient = this.getSentryServiceClient()) { + LOGGER.debug("Starting Sentry/HMS notifications sync for {} (id: {})", eventName, eventId); + long sentryLatestProcessedId = sentryClient.syncNotifications(eventId); + LOGGER.debug("Finished Sentry/HMS notifications sync for {} (id: {})", eventName, eventId); + LOGGER.debug("Latest processed event ID returned by the Sentry server: {}", sentryLatestProcessedId); + + updateProcessedId(sentryLatestProcessedId); + } catch (Exception e) { + // This error is only logged. There is no need to throw an error to Hive because HMS sync is called + // after the notification is already generated by Hive (as post-event). + LOGGER.error("Failed to sync requested HMS notifications up to the event ID: " + eventId, e); + } + } + + /** + * @return True if the HMS is calling this notification in an active transaction; False otherwise + */ + private boolean isMetastoreTransactionActive(Map<String, String> parameters) { + String transactionActive = + parameters.get(MetaStoreEventListenerConstants.HIVE_METASTORE_TRANSACTION_ACTIVE); + + return transactionActive != null && Boolean.valueOf(transactionActive); + } + + /** + * Updates the latest processed ID, if and only if eventId is bigger. This keeps the contract that + * {@link #latestProcessedId} may only increase. + * + * @param eventId The value to be set on the {@link #latestProcessedId} + */ + private void updateProcessedId(long eventId) { + long oldVal = latestProcessedId.get(); + if (eventId > oldVal) { + // It is fine for the compareAndSet to fail + latestProcessedId.compareAndSet(oldVal, eventId); + } + } + + /** + * Sets the sentry client object (for testing purposes only) + * + * It may be set by unit-tests as a mock object and used to verify that the client methods + * were called correctly (see TestSentrySyncHMSNotificationsPostEventListener). + */ + @VisibleForTesting + void setSentryServiceClient(SentryPolicyServiceClient serviceClient) { + this.serviceClient = serviceClient; + } + + private SentryPolicyServiceClient getSentryServiceClient() throws MetaException { + // Return the sentry client in case was set by the unit tests. + if (serviceClient != null) { + return serviceClient; + } + + try { + return SentryServiceClientFactory.create(authzConf); + } catch (Exception e) { + throw new MetaException("Failed to connect to Sentry service " + e.getMessage()); + } + } + + private boolean failedEvent(ListenerEvent event, String eventName) { + if (!event.getStatus()) { + LOGGER.debug("Skip HMS synchronization request with the Sentry server for {} " + + "{} since the operation failed. \n", eventName, event); + return true; + } + + return false; + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/372ffc9b/sentry-binding/sentry-binding-hive/src/test/java/org/apache/sentry/binding/metastore/TestSentrySyncHMSNotificationsPostEventListener.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-hive/src/test/java/org/apache/sentry/binding/metastore/TestSentrySyncHMSNotificationsPostEventListener.java b/sentry-binding/sentry-binding-hive/src/test/java/org/apache/sentry/binding/metastore/TestSentrySyncHMSNotificationsPostEventListener.java new file mode 100644 index 0000000..cca326b --- /dev/null +++ b/sentry-binding/sentry-binding-hive/src/test/java/org/apache/sentry/binding/metastore/TestSentrySyncHMSNotificationsPostEventListener.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.binding.metastore; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreEventListenerConstants; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.CreateTableEvent; +import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.DropTableEvent; +import org.apache.hadoop.hive.metastore.events.ListenerEvent; +import org.apache.sentry.binding.hive.conf.HiveAuthzConf; +import org.apache.sentry.core.common.exception.SentryUserException; +import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.FileOutputStream; +import java.io.IOException; + +/** + * Testing class that tests and verifies the sync sentry notifications are called correctly. + */ +public class TestSentrySyncHMSNotificationsPostEventListener { + private static final boolean FAILED_STATUS = false; + private static final boolean SUCCESSFUL_STATUS = true; + private static final boolean EVENT_ID_SET = true; + private static final boolean EVENT_ID_UNSET = false; + + private SentrySyncHMSNotificationsPostEventListener eventListener; + private SentryPolicyServiceClient mockSentryClient; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void setUp() throws IOException, MetaException, SentryUserException { + String sentryConfFile = tempFolder.newFile().getAbsolutePath(); + + HiveConf hiveConf = new HiveConf(TestSentrySyncHMSNotificationsPostEventListener.class); + hiveConf.set(HiveAuthzConf.HIVE_SENTRY_CONF_URL, "file://" + sentryConfFile); + + // Instead of generating an empty sentry-site.xml, we just write the same info from HiveConf. + // The SentrySyncHMSNotificationsPostEventListener won't use any information from it after all. + hiveConf.writeXml(new FileOutputStream(sentryConfFile)); + + eventListener = new SentrySyncHMSNotificationsPostEventListener(hiveConf); + + mockSentryClient = Mockito.mock(SentryPolicyServiceClient.class); + + // For some reason I cannot use a Mockito.spy() on the eventListener and just mock the + // getSentryServiceClient() to return the mock. When the TestURI runs before this + // test, then a mock exception is thrown saying a I have an unfinished stubbing method. + // This was the best approach I could take for now. + eventListener.setSentryServiceClient(mockSentryClient); + } + + @Test + public void testFailedEventsDoNotSyncNotifications() throws MetaException, SentryUserException { + callAllEventsThatSynchronize(FAILED_STATUS, EVENT_ID_UNSET); + Mockito.verifyZeroInteractions(mockSentryClient); + } + + @Test + public void testEventsWithoutAnEventIdDoNotSyncNotifications() throws MetaException { + callAllEventsThatSynchronize(SUCCESSFUL_STATUS, EVENT_ID_UNSET); + Mockito.verifyZeroInteractions(mockSentryClient); + } + + @Test + public void testSuccessfulEventsWithAnEventIdSyncNotifications() throws Exception { + long latestEventId = callAllEventsThatSynchronize(SUCCESSFUL_STATUS, EVENT_ID_SET); + + for (int i=1; i<=latestEventId; i++) { + Mockito.verify( + mockSentryClient, Mockito.times(1) + ).syncNotifications(i); + } + + Mockito.verify( + mockSentryClient, Mockito.times((int)latestEventId) + ).close(); + + Mockito.verifyNoMoreInteractions(mockSentryClient); + } + + @Test + public void testSyncNotificationsWithNewLatestProcessedIdMayAvoidSyncingCalls() throws Exception { + Mockito.doAnswer(new Answer<Long>() { + @Override + public Long answer(InvocationOnMock invocation) throws Throwable { + Long id = (Long)invocation.getArguments()[0]; + return id + 1; + } + }).when(mockSentryClient).syncNotifications(Mockito.anyLong()); + + long latestEventId = callAllEventsThatSynchronize(SUCCESSFUL_STATUS, EVENT_ID_SET); + + for (int i=1; i<=latestEventId; i+=2) { + Mockito.verify( + mockSentryClient, Mockito.times(1) + ).syncNotifications(i); + } + + Mockito.verify( + mockSentryClient, Mockito.times((int)latestEventId / 2) + ).close(); + + Mockito.verifyNoMoreInteractions(mockSentryClient); + } + + private long callAllEventsThatSynchronize(boolean status, boolean eventIdSet) throws MetaException { + long eventId = 0; + + CreateDatabaseEvent createDatabaseEvent = new CreateDatabaseEvent(null, status , null); + setEventId(eventIdSet, createDatabaseEvent, ++eventId); + eventListener.onCreateDatabase(createDatabaseEvent); + + DropDatabaseEvent dropDatabaseEvent = new DropDatabaseEvent(null, status , null); + setEventId(eventIdSet, dropDatabaseEvent, ++eventId); + eventListener.onDropDatabase(dropDatabaseEvent); + + CreateTableEvent createTableEvent = new CreateTableEvent(null, status , null); + setEventId(eventIdSet, createTableEvent, ++eventId); + eventListener.onCreateTable(createTableEvent); + + DropTableEvent dropTableEvent = new DropTableEvent(null, status , false, null); + setEventId(eventIdSet, dropTableEvent, ++eventId); + eventListener.onDropTable(dropTableEvent); + + return eventId; + } + + private void setEventId(boolean eventIdSet, ListenerEvent eventListener, long eventId) { + if (eventIdSet) { + eventListener.putParameter( + MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME, String.valueOf(eventId)); + } + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/372ffc9b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java index 61833fc..f69a8cd 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java @@ -215,4 +215,13 @@ public interface SentryPolicyServiceClient extends AutoCloseable { // export the sentry mapping data with map structure Map<String, Map<String, Set<String>>> exportPolicy(String requestorUserName, String objectPath) throws SentryUserException; + + /** + * Requests the sentry server to synchronize all HMS notification events up to the specified id. + * The sentry server will return once it have processed the id specified.. + * + * @param id Requested HMS notification ID. + * @return The most recent processed notification ID. + */ + long syncNotifications(long id) throws SentryUserException; } http://git-wip-us.apache.org/repos/asf/sentry/blob/372ffc9b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java index 7ada138..bede5b1 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java @@ -1065,4 +1065,17 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService transport = null; } } + + public long syncNotifications(long id) throws SentryUserException { + TSentrySyncIDRequest request = + new TSentrySyncIDRequest(ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, id); + + try { + TSentrySyncIDResponse response = client.sentry_sync_notifications(request); + Status.throwIfNotOk(response.getStatus()); + return response.getId(); + } catch (TException e) { + throw new SentryUserException(THRIFT_EXCEPTION_MESSAGE, e); + } + } } \ No newline at end of file
