Repository: sentry Updated Branches: refs/heads/sentry-ha-redesign 5a2322338 -> 81eed11a9
SENTRY-1669: HMSFollower should read current processed notification ID from database every time it runs (Kalyan Kalvagadda, reviewed by Sergio Pena and Alex Kolbasov) Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/81eed11a Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/81eed11a Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/81eed11a Branch: refs/heads/sentry-ha-redesign Commit: 81eed11a91139cd9f4e0582e8f2daa3d839df61b Parents: 5a23223 Author: Alexander Kolbasov <[email protected]> Authored: Thu May 18 23:15:08 2017 -0700 Committer: Alexander Kolbasov <[email protected]> Committed: Thu May 18 23:15:08 2017 -0700 ---------------------------------------------------------------------- .../service/model/MSentryHmsNotification.java | 81 ++++ .../db/service/model/MSentryPathChange.java | 1 + .../db/service/model/MSentryPermChange.java | 1 + .../provider/db/service/model/package.jdo | 9 +- .../persistent/DeltaTransactionBlock.java | 10 +- .../db/service/persistent/SentryStore.java | 41 +- .../sentry/service/thrift/HMSFollower.java | 438 +++++++++++-------- .../service/thrift/NotificationProcessor.java | 8 +- .../db/service/persistent/TestSentryStore.java | 81 +++- 9 files changed, 457 insertions(+), 213 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/81eed11a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java new file mode 100644 index 0000000..0d54548 --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.provider.db.service.model; + +/** + * Database backend store for HMS Notification ID's. All the notifications that are processed + * by sentry are stored. + */ + +/* + * <p> HMS notification ID's are stored in separate table for three reasons</p> + * <ol> + * <li>SENTRY_PATH_CHANGE is not updated for every notification that is received from HMS. There + * are cases where HMSFollower doesn't process notifications and skip's them. Depending on + * SENTRY_PATH_CHANGE information may not provide the last notification processed.</li> + * <li> There could be cases where HMSFollower thread in multiple sentry servers acting as a + * leader and process HMS notifications. we need to avoid processing the notifications + * multiple times. This can be made sure by always having some number of notification + * information always regardless of purging interval.</li> + * <li>SENTRY_PATH_CHANGE information stored can typically be removed once namenode plug-in + * has processed the update.</li> + * </ol> + * <p> + * As the purpose and usage of notification ID information is different from PATH update info, + * it locally makes sense to store notification ID separately. + * </p> + */ +public class MSentryHmsNotification { + private long notificationId; + + public MSentryHmsNotification(long notificationId) { + this.notificationId = notificationId; + } + + public long getId() { + return notificationId; + } + + public void setId(long notificationId) { + this.notificationId = notificationId; + } + + @Override + public int hashCode() { + return (int) notificationId; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null) { + return false; + } + + if (getClass() != obj.getClass()) { + return false; + } + + MSentryHmsNotification other = (MSentryHmsNotification) obj; + + return (notificationId == other.notificationId); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/81eed11a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java index 42f80aa..d11f37f 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java @@ -60,6 +60,7 @@ import javax.jdo.annotations.PrimaryKey; public class MSentryPathChange implements MSentryChange { @PrimaryKey + //This value is auto incremented by JDO private long changeID; // Path change in JSON format. http://git-wip-us.apache.org/repos/asf/sentry/blob/81eed11a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPermChange.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPermChange.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPermChange.java index 8d9528f..1cb1a1f 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPermChange.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPermChange.java @@ -58,6 +58,7 @@ import javax.jdo.annotations.PrimaryKey; public class MSentryPermChange implements MSentryChange { @PrimaryKey + //This value is auto incremented by JDO private long changeID; // Permission change in JSON format. http://git-wip-us.apache.org/repos/asf/sentry/blob/81eed11a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/package.jdo ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/package.jdo b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/package.jdo index 8fd5278..96ab462 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/package.jdo +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/package.jdo @@ -297,7 +297,10 @@ <column name="CREATE_TIME_MS" jdbc-type="BIGINT"/> </field> </class> - + <class name="MSentryHmsNotification" table="SENTRY_HMS_NOTIFICATION_ID" identity-type="application" detachable="true"> + <field name="notificationId" primary-key="true"> + <column name="NOTIFICATION_ID" jdbc-type="BIGINT" allows-null="false"/> + </field> + </class> </package> -</jdo> - +</jdo> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/81eed11a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/DeltaTransactionBlock.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/DeltaTransactionBlock.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/DeltaTransactionBlock.java index 709d195..77282da 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/DeltaTransactionBlock.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/DeltaTransactionBlock.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import org.apache.sentry.core.common.exception.SentryInvalidInputException; import org.apache.sentry.hdfs.PathsUpdate; import org.apache.sentry.hdfs.PermissionsUpdate; +import org.apache.sentry.provider.db.service.model.MSentryHmsNotification; import org.apache.sentry.provider.db.service.model.MSentryPathChange; import org.apache.sentry.provider.db.service.model.MSentryPermChange; import static org.apache.sentry.hdfs.Updateable.Update; @@ -84,12 +85,15 @@ public class DeltaTransactionBlock implements TransactionBlock<Object> { // changeID is trying to be persisted twice, the transaction would // fail. if (update instanceof PermissionsUpdate) { - pm.makePersistent(new MSentryPermChange((PermissionsUpdate)update)); + pm.makePersistent(new MSentryPermChange((PermissionsUpdate) update)); } else if (update instanceof PathsUpdate) { - pm.makePersistent(new MSentryPathChange((PathsUpdate)update)); + pm.makePersistent(new MSentryPathChange((PathsUpdate) update)); + // Notification id from PATH_UPDATE entry is made persistent in + // SENTRY_LAST_NOTIFICATION_ID table. + pm.makePersistent(new MSentryHmsNotification(update.getSeqNum())); } else { throw new SentryInvalidInputException("Update should be type of either " + - "PermissionsUpdate or PathsUpdate.\n"); + "PermissionsUpdate or PathsUpdate.\n"); } } } http://git-wip-us.apache.org/repos/asf/sentry/blob/81eed11a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java index 29e3686..7756c4a 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java @@ -56,6 +56,7 @@ import org.apache.sentry.core.model.db.DBModelAuthorizable.AuthorizableType; import org.apache.sentry.provider.db.service.model.MAuthzPathsMapping; import org.apache.sentry.provider.db.service.model.MSentryChange; import org.apache.sentry.provider.db.service.model.MSentryGroup; +import org.apache.sentry.provider.db.service.model.MSentryHmsNotification; import org.apache.sentry.provider.db.service.model.MSentryPathChange; import org.apache.sentry.provider.db.service.model.MSentryPermChange; import org.apache.sentry.provider.db.service.model.MSentryPrivilege; @@ -122,6 +123,8 @@ public class SentryStore { public static final long EMPTY_CHANGE_ID = 0L; + public static final long EMPTY_NOTIFICATION_ID = 0L; + // For counters, representation of the "unknown value" private static final long COUNT_VALUE_UNKNOWN = -1L; @@ -3436,6 +3439,35 @@ public class SentryStore { } /** + * Gets the last processed Notification ID + * <p> + * As the table might have zero or one record, result of the query + * might be null OR instance of MSentryHmsNotification. + * + * @param pm the PersistenceManager + * @return EMPTY_NOTIFICATION_ID(0) when there are no notifications processed. + * else last NotificationID processed by HMSFollower + */ + static Long getLastProcessedNotificationIDCore( + PersistenceManager pm) { + Query query = pm.newQuery(MSentryHmsNotification.class); + query.setResult("max(notificationId)"); + Long notificationId = (Long) query.execute(); + return notificationId == null ? EMPTY_NOTIFICATION_ID : notificationId; + } + + /** + * Set the notification ID of last processed HMS notification. + */ + public void persistLastProcessedNotificationID(final Long notificationId) throws Exception { + tm.executeTransaction( + new TransactionBlock<Object>() { + public Object execute(PersistenceManager pm) throws Exception { + return pm.makePersistent(new MSentryHmsNotification(notificationId)); + } + }); + } + /** * Gets the last processed change ID for perm delta changes. * * Internally invoke {@link #getLastProcessedChangeIDCore(PersistenceManager, Class)} @@ -3477,14 +3509,7 @@ public class SentryStore { return tm.executeTransaction( new TransactionBlock<Long>() { public Long execute(PersistenceManager pm) throws Exception { - pm.setDetachAllOnCommit(false); // No need to detach objects - long changeID = getLastProcessedChangeIDCore(pm, MSentryPathChange.class); - if (changeID == EMPTY_CHANGE_ID) { - return EMPTY_CHANGE_ID; - } else { - MSentryPathChange mSentryPathChange = getMSentryPathChangeByID(changeID); - return mSentryPathChange.getNotificationID(); - } + return getLastProcessedNotificationIDCore(pm); } }); } http://git-wip-us.apache.org/repos/asf/sentry/blob/81eed11a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java index 72e9d72..59eda52 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -47,7 +47,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.sentry.binding.metastore.messaging.json.*; +import javax.jdo.JDODataStoreException; import javax.security.auth.login.LoginException; + import java.io.File; import java.io.IOException; import java.net.SocketException; @@ -71,7 +73,6 @@ import static org.apache.sentry.hdfs.Updateable.Update; @SuppressWarnings("PMD") public class HMSFollower implements Runnable, AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(HMSFollower.class); - private long currentEventID; // Track the latest eventId of the event that has been logged. So we don't log the same message private long lastLoggedEventId = SentryStore.EMPTY_CHANGE_ID; private static boolean connectedToHMS = false; @@ -86,16 +87,24 @@ public class HMSFollower implements Runnable, AutoCloseable { private boolean needLogHMSSupportReady = true; private final LeaderStatusMonitor leaderMonitor; - HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor) throws Exception { + HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor) { LOGGER.info("HMSFollower is being initialized"); + Long lastProcessedNotificationID; authzConf = conf; this.leaderMonitor = leaderMonitor; sentryStore = store; - // Initialize currentEventID based on the latest persisted notification ID. - // If currentEventID is empty, need to retrieve a full hive snapshot, - currentEventID = getLastProcessedNotificationID(); - needHiveSnapshot = (currentEventID == SentryStore.EMPTY_CHANGE_ID); + try { + // Initializing lastProcessedNotificationID based on the latest persisted notification ID. + lastProcessedNotificationID = sentryStore.getLastProcessedNotificationID(); + } catch (Exception e) { + LOGGER.error("Failed to get the last processed notification id from sentry store, " + + "Skipping the processing", e); + needHiveSnapshot = true; + return; + } + // If lastProcessedNotificationID is empty, need to retrieve a full hive snapshot, + needHiveSnapshot = (lastProcessedNotificationID == SentryStore.EMPTY_CHANGE_ID); } @VisibleForTesting @@ -123,8 +132,8 @@ public class HMSFollower implements Runnable, AutoCloseable { * Throws @MetaException if there was a problem on creating an HMSClient */ private HiveMetaStoreClient getMetaStoreClient(Configuration conf) - throws IOException, InterruptedException, LoginException, MetaException { - if(client != null) { + throws IOException, InterruptedException, LoginException, MetaException { + if (client != null) { return client; } @@ -147,29 +156,29 @@ public class HMSFollower implements Runnable, AutoCloseable { //TODO: Is this the right(standard) way to create a HMS client? HiveMetastoreClientFactoryImpl? //TODO: Check if HMS is using kerberos instead of relying on Sentry conf kerberos = ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase( - conf.get(ServiceConstants.ServerConfig.SECURITY_MODE, ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS).trim()); + conf.get(ServiceConstants.ServerConfig.SECURITY_MODE, ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS).trim()); if (kerberos) { LOGGER.info("Making a kerberos connection to HMS"); try { int port = conf.getInt(ServiceConstants.ServerConfig.RPC_PORT, ServiceConstants.ServerConfig.RPC_PORT_DEFAULT); String rawPrincipal = Preconditions.checkNotNull(conf.get(ServiceConstants.ServerConfig.PRINCIPAL), - ServiceConstants.ServerConfig.PRINCIPAL + " is required"); + ServiceConstants.ServerConfig.PRINCIPAL + " is required"); principal = SecurityUtil.getServerPrincipal(rawPrincipal, NetUtils.createSocketAddr( - conf.get(ServiceConstants.ServerConfig.RPC_ADDRESS, ServiceConstants.ServerConfig.RPC_ADDRESS_DEFAULT), port).getAddress()); - } catch(IOException io) { + conf.get(ServiceConstants.ServerConfig.RPC_ADDRESS, ServiceConstants.ServerConfig.RPC_ADDRESS_DEFAULT), port).getAddress()); + } catch (IOException io) { throw new RuntimeException("Can't translate kerberos principal'", io); } LOGGER.info("Using kerberos principal: " + principal); final String[] principalParts = SaslRpcServer.splitKerberosName(principal); Preconditions.checkArgument(principalParts.length == 3, - "Kerberos principal should have 3 parts: " + principal); + "Kerberos principal should have 3 parts: " + principal); keytab = Preconditions.checkNotNull(conf.get(ServiceConstants.ServerConfig.KEY_TAB), - ServiceConstants.ServerConfig.KEY_TAB + " is required"); + ServiceConstants.ServerConfig.KEY_TAB + " is required"); File keytabFile = new File(keytab); Preconditions.checkState(keytabFile.isFile() && keytabFile.canRead(), - "Keytab " + keytab + " does not exist or is not readable."); + "Keytab " + keytab + " does not exist or is not readable."); try { // Instantiating SentryKerberosContext in non-server mode handles the ticket renewal. @@ -208,21 +217,24 @@ public class HMSFollower implements Runnable, AutoCloseable { @Override public void run() { - // Wake any clients connected to this service waiting for HMS already processed notifications. + Long lastProcessedNotificationID; try { - wakeUpWaitingClientsForSync(getLastProcessedNotificationID()); + // Initializing lastProcessedNotificationID based on the latest persisted notification ID. + lastProcessedNotificationID = sentryStore.getLastProcessedNotificationID(); } catch (Exception e) { - LOGGER.error("Couldn't wake up HMS waiters because an error attempting to get the latest notification ID.", e); + LOGGER.error("Failed to get the last processed notification id from sentry store, " + + "Skipping the processing", e); + return; } - + // Wake any clients connected to this service waiting for HMS already processed notifications. + wakeUpWaitingClientsForSync(lastProcessedNotificationID); // Only the leader should listen to HMS updates if ((leaderMonitor != null) && !leaderMonitor.isLeader()) { // Close any outstanding connections to HMS closeHMSConnection(); return; } - - processHiveMetastoreUpdates(); + processHiveMetastoreUpdates(lastProcessedNotificationID); } /** @@ -249,7 +261,7 @@ public class HMSFollower implements Runnable, AutoCloseable { * * Clients connections waiting for an event notification will be woken up afterwards. */ - private void processHiveMetastoreUpdates() { + private void processHiveMetastoreUpdates(Long lastProcessedNotificationID) { if (client == null) { try { client = getMetaStoreClient(authzConf); @@ -301,18 +313,24 @@ public class HMSFollower implements Runnable, AutoCloseable { if (!eventIDBefore.equals(eventIDAfter)) { LOGGER.error("#### Fail to get a point-in-time hive full snapshot !! Current NotificationID = " + - eventIDAfter.toString()); + eventIDAfter.toString()); return; } LOGGER.info(String.format("Successfully fetched hive full snapshot, Current NotificationID = %s.", - eventIDAfter)); - needHiveSnapshot = false; - currentEventID = eventIDAfter.getEventId(); + eventIDAfter)); + // As eventIDAfter is the last event that was processed, eventIDAfter is used to update + // lastProcessedNotificationID instead of getting it from persistent store. + lastProcessedNotificationID = eventIDAfter.getEventId(); sentryStore.persistFullPathsImage(pathsFullSnapshot); - + needHiveSnapshot = false; + sentryStore.persistLastProcessedNotificationID(eventIDAfter.getEventId()); // Wake up any HMS waiters that could have been put on hold before getting the eventIDBefore value. - wakeUpWaitingClientsForSync(currentEventID); + wakeUpWaitingClientsForSync(lastProcessedNotificationID); + } else { + // Every time HMSFollower is scheduled to run, value should be updates based + // on the value stored in database. + lastProcessedNotificationID = sentryStore.getLastProcessedNotificationID(); } // HMSFollower connected to HMS and it finished full snapshot if that was required @@ -326,15 +344,16 @@ public class HMSFollower implements Runnable, AutoCloseable { // NotificationEventResponse causing TProtocolException. // Workaround: Only processes the notification events newer than the last updated one. CurrentNotificationEventId eventId = client.getCurrentNotificationEventId(); - if (eventId.getEventId() > currentEventID) { - NotificationEventResponse response = client.getNextNotification(currentEventID, Integer.MAX_VALUE, null); + if (eventId.getEventId() > lastProcessedNotificationID) { + NotificationEventResponse response = + client.getNextNotification(lastProcessedNotificationID, Integer.MAX_VALUE, null); if (response.isSetEvents()) { if (!response.getEvents().isEmpty()) { - if (currentEventID != lastLoggedEventId) { + if (lastProcessedNotificationID != lastLoggedEventId) { // Only log when there are updates and the notification ID has changed. - LOGGER.debug(String.format("CurrentEventID = %s. Processing %s events", - currentEventID, response.getEvents().size())); - lastLoggedEventId = currentEventID; + LOGGER.debug(String.format("lastProcessedNotificationID = %s. Processing %s events", + lastProcessedNotificationID, response.getEvents().size())); + lastLoggedEventId = lastProcessedNotificationID; } processNotificationEvents(response.getEvents()); @@ -349,13 +368,13 @@ public class HMSFollower implements Runnable, AutoCloseable { } else { LOGGER.error("ThriftException occured fetching Notification entries, will try", e); } - } catch (SentryInvalidInputException |SentryInvalidHMSEventException e) { + } catch (SentryInvalidInputException | SentryInvalidHMSEventException e) { LOGGER.error("Encounter SentryInvalidInputException|SentryInvalidHMSEventException " + - "while processing notification log", e); + "while processing notification log", e); } catch (Throwable t) { // catching errors to prevent the executor to halt. LOGGER.error("Caught unexpected exception in HMSFollower! Caused by: " + t.getMessage(), - t.getCause()); + t.getCause()); t.printStackTrace(); } } @@ -383,6 +402,7 @@ public class HMSFollower implements Runnable, AutoCloseable { /** * Retrieve a Hive full snapshot from HMS. + * * @return HMS snapshot. Snapshot consists of a mapping from auth object name * to the set of paths corresponding to that name. * @throws InterruptedException @@ -390,7 +410,7 @@ public class HMSFollower implements Runnable, AutoCloseable { * @throws ExecutionException */ private Map<String, Set<String>> fetchFullUpdate() - throws InterruptedException, TException, ExecutionException { + throws InterruptedException, TException, ExecutionException { LOGGER.info("Request full HMS snapshot"); try (FullUpdateInitializer updateInitializer = new FullUpdateInitializer(client, authzConf)) { Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot(); @@ -399,16 +419,6 @@ public class HMSFollower implements Runnable, AutoCloseable { } } - /** - * Get the last processed eventID from Sentry DB. - * - * @return the stored currentID - * @throws Exception - */ - private long getLastProcessedNotificationID() throws Exception { - return sentryStore.getLastProcessedNotificationID(); - } - private boolean syncWithPolicyStore(HiveAuthzConf.AuthzConfVars syncConfVar) { return "true" .equalsIgnoreCase((authzConf.get(syncConfVar.getVar(), syncConfVar.getDefault()))); @@ -420,6 +430,7 @@ public class HMSFollower implements Runnable, AutoCloseable { void processNotificationEvents(List<NotificationEvent> events) throws Exception { SentryJSONMessageDeserializer deserializer = new SentryJSONMessageDeserializer(); + boolean isNotificationProcessingSkipped = false; for (NotificationEvent event : events) { String dbName; String tableName; @@ -428,172 +439,221 @@ public class HMSFollower implements Runnable, AutoCloseable { String location; List<String> locations; NotificationProcessor notificationProcessor = new NotificationProcessor(sentryStore, LOGGER); - switch (HCatEventMessage.EventType.valueOf(event.getEventType())) { - case CREATE_DATABASE: - SentryJSONCreateDatabaseMessage message = deserializer.getCreateDatabaseMessage(event.getMessage()); - dbName = message.getDB(); - location = message.getLocation(); - if ((dbName == null) || (location == null)) { - throw new SentryInvalidHMSEventException(String.format("Create database event " + - "has incomplete information. dbName = %s location = %s", + try { + switch (HCatEventMessage.EventType.valueOf(event.getEventType())) { + case CREATE_DATABASE: + SentryJSONCreateDatabaseMessage message = deserializer.getCreateDatabaseMessage(event.getMessage()); + dbName = message.getDB(); + location = message.getLocation(); + if ((dbName == null) || (location == null)) { + isNotificationProcessingSkipped = true; + LOGGER.error(String.format("Create database event " + + "has incomplete information. dbName = %s location = %s", StringUtils.defaultIfBlank(dbName, "null"), StringUtils.defaultIfBlank(location, "null"))); - } - if (syncWithPolicyStore(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE)) { - dropSentryDbPrivileges(dbName, event); - } - notificationProcessor.processCreateDatabase(dbName,location, event.getEventId()); - break; - case DROP_DATABASE: - SentryJSONDropDatabaseMessage dropDatabaseMessage = + break; + } + if (syncWithPolicyStore(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE)) { + dropSentryDbPrivileges(dbName, event); + } + notificationProcessor.processCreateDatabase(dbName, location, event.getEventId()); + break; + case DROP_DATABASE: + SentryJSONDropDatabaseMessage dropDatabaseMessage = deserializer.getDropDatabaseMessage(event.getMessage()); - dbName = dropDatabaseMessage.getDB(); - location = dropDatabaseMessage.getLocation(); - if (dbName == null) { - throw new SentryInvalidHMSEventException( - "Drop database event has incomplete information: dbName = null"); - } - if (syncWithPolicyStore(AUTHZ_SYNC_DROP_WITH_POLICY_STORE)) { - dropSentryDbPrivileges(dbName, event); - } - notificationProcessor.processDropDatabase(dbName, location, event.getEventId()); - break; - case CREATE_TABLE: - SentryJSONCreateTableMessage createTableMessage = deserializer.getCreateTableMessage(event.getMessage()); - dbName = createTableMessage.getDB(); - tableName = createTableMessage.getTable(); - location = createTableMessage.getLocation(); - if ((dbName == null) || (tableName == null) || (location == null)) { - throw new SentryInvalidHMSEventException(String.format("Create table event " + - "has incomplete information. dbName = %s, tableName = %s, location = %s", + dbName = dropDatabaseMessage.getDB(); + location = dropDatabaseMessage.getLocation(); + if (dbName == null) { + isNotificationProcessingSkipped = true; + LOGGER.error("Drop database event has incomplete information: dbName = null"); + break; + } + if (syncWithPolicyStore(AUTHZ_SYNC_DROP_WITH_POLICY_STORE)) { + dropSentryDbPrivileges(dbName, event); + } + notificationProcessor.processDropDatabase(dbName, location, event.getEventId()); + break; + case CREATE_TABLE: + SentryJSONCreateTableMessage createTableMessage = deserializer.getCreateTableMessage(event.getMessage()); + dbName = createTableMessage.getDB(); + tableName = createTableMessage.getTable(); + location = createTableMessage.getLocation(); + if ((dbName == null) || (tableName == null) || (location == null)) { + isNotificationProcessingSkipped = true; + LOGGER.error(String.format("Create table event " + "has incomplete information." + + " dbName = %s, tableName = %s, location = %s", StringUtils.defaultIfBlank(dbName, "null"), StringUtils.defaultIfBlank(tableName, "null"), StringUtils.defaultIfBlank(location, "null"))); - } - if (syncWithPolicyStore(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE)) { - dropSentryTablePrivileges(dbName, tableName, event); - } - notificationProcessor.processCreateTable(dbName, tableName, location, event.getEventId()); - break; - case DROP_TABLE: - SentryJSONDropTableMessage dropTableMessage = deserializer.getDropTableMessage(event.getMessage()); - dbName = dropTableMessage.getDB(); - tableName = dropTableMessage.getTable(); - if ((dbName == null) || (tableName == null)) { - throw new SentryInvalidHMSEventException(String.format("Drop table event " + - "has incomplete information. dbName = %s, tableName = %s", + break; + } + if (syncWithPolicyStore(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE)) { + dropSentryTablePrivileges(dbName, tableName, event); + } + notificationProcessor.processCreateTable(dbName, tableName, location, event.getEventId()); + break; + case DROP_TABLE: + SentryJSONDropTableMessage dropTableMessage = deserializer.getDropTableMessage(event.getMessage()); + dbName = dropTableMessage.getDB(); + tableName = dropTableMessage.getTable(); + if ((dbName == null) || (tableName == null)) { + isNotificationProcessingSkipped = true; + LOGGER.error(String.format("Drop table event " + + "has incomplete information. dbName = %s, tableName = %s", StringUtils.defaultIfBlank(dbName, "null"), StringUtils.defaultIfBlank(tableName, "null"))); - } - if (syncWithPolicyStore(AUTHZ_SYNC_DROP_WITH_POLICY_STORE)) { - dropSentryTablePrivileges(dbName, tableName, event); - } - notificationProcessor.processDropTable(dbName, tableName, event.getEventId()); - break; - case ALTER_TABLE: - SentryJSONAlterTableMessage alterTableMessage = deserializer.getAlterTableMessage(event.getMessage()); - - String oldDbName = alterTableMessage.getDB(); - String oldTableName = alterTableMessage.getTable(); - String newDbName = event.getDbName(); - String newTableName = event.getTableName(); - oldLocation = alterTableMessage.getOldLocation(); - newLocation = alterTableMessage.getNewLocation(); - - if ((oldDbName == null) || + break; + } + if (syncWithPolicyStore(AUTHZ_SYNC_DROP_WITH_POLICY_STORE)) { + dropSentryTablePrivileges(dbName, tableName, event); + } + notificationProcessor.processDropTable(dbName, tableName, event.getEventId()); + break; + case ALTER_TABLE: + SentryJSONAlterTableMessage alterTableMessage = deserializer.getAlterTableMessage(event.getMessage()); + + String oldDbName = alterTableMessage.getDB(); + String oldTableName = alterTableMessage.getTable(); + String newDbName = event.getDbName(); + String newTableName = event.getTableName(); + oldLocation = alterTableMessage.getOldLocation(); + newLocation = alterTableMessage.getNewLocation(); + + if ((oldDbName == null) || (oldTableName == null) || (newDbName == null) || (newTableName == null) || (oldLocation == null) || (newLocation == null)) { - throw new SentryInvalidHMSEventException(String.format("Alter table event " + - "has incomplete information. oldDbName = %s, oldTableName = %s, oldLocation = %s, " + - "newDbName = %s, newTableName = %s, newLocation = %s", + isNotificationProcessingSkipped = true; + LOGGER.error(String.format("Alter table event " + + "has incomplete information. oldDbName = %s, oldTableName = %s, oldLocation = %s, " + + "newDbName = %s, newTableName = %s, newLocation = %s", StringUtils.defaultIfBlank(oldDbName, "null"), StringUtils.defaultIfBlank(oldTableName, "null"), StringUtils.defaultIfBlank(oldLocation, "null"), StringUtils.defaultIfBlank(newDbName, "null"), StringUtils.defaultIfBlank(newTableName, "null"), StringUtils.defaultIfBlank(newLocation, "null"))); - } + break; + } else if ((oldDbName == newDbName) && + (oldTableName == newTableName) && + (oldLocation == newLocation)) { + isNotificationProcessingSkipped = true; + LOGGER.info(String.format("Alter table notification ignored as neither name nor " + + "location has changed: oldAuthzObj = %s, oldLocation = %s, newAuthzObj = %s, " + + "newLocation = %s", oldDbName + "." + oldTableName , oldLocation, + newDbName + "." + newTableName, newLocation)); + break; + } - if(!newDbName.equalsIgnoreCase(oldDbName) || !oldTableName.equalsIgnoreCase(newTableName)) { - // Name has changed - try { - renamePrivileges(oldDbName, oldTableName, newDbName, newTableName); - } catch (SentryNoSuchObjectException e) { - LOGGER.info("Rename Sentry privilege ignored as there are no privileges on the table: %s.%s", + if (!newDbName.equalsIgnoreCase(oldDbName) || !oldTableName.equalsIgnoreCase(newTableName)) { + // Name has changed + try { + renamePrivileges(oldDbName, oldTableName, newDbName, newTableName); + } catch (SentryNoSuchObjectException e) { + LOGGER.info("Rename Sentry privilege ignored as there are no privileges on the table: %s.%s", oldDbName, oldTableName); - } catch (Exception e) { - throw new SentryInvalidInputException("Could not process Alter table event. Event: " + event.toString(), e); + } catch (Exception e) { + isNotificationProcessingSkipped = true; + LOGGER.info("Could not process Alter table event. Event: " + event.toString(), e); + break; + } } - } - notificationProcessor.processAlterTable(oldDbName, newDbName, oldTableName, + notificationProcessor.processAlterTable(oldDbName, newDbName, oldTableName, newTableName, oldLocation, newLocation, event.getEventId()); - break; - case ADD_PARTITION: - SentryJSONAddPartitionMessage addPartitionMessage = - deserializer.getAddPartitionMessage(event.getMessage()); - dbName = addPartitionMessage.getDB(); - tableName = addPartitionMessage.getTable(); - locations = addPartitionMessage.getLocations(); - if ((dbName == null) || (tableName == null) || (locations == null)) { - LOGGER.error(String.format("Create table event has incomplete information. " + - "dbName = %s, tableName = %s, locations = %s", + break; + case ADD_PARTITION: + SentryJSONAddPartitionMessage addPartitionMessage = + deserializer.getAddPartitionMessage(event.getMessage()); + dbName = addPartitionMessage.getDB(); + tableName = addPartitionMessage.getTable(); + locations = addPartitionMessage.getLocations(); + if ((dbName == null) || (tableName == null) || (locations == null)) { + isNotificationProcessingSkipped = true; + LOGGER.error(String.format("Create table event has incomplete information. " + + "dbName = %s, tableName = %s, locations = %s", StringUtils.defaultIfBlank(dbName, "null"), StringUtils.defaultIfBlank(tableName, "null"), locations != null ? locations.toString() : "null")); - } - notificationProcessor.processAddPartition(dbName, tableName, locations, - event.getEventId()); - break; - case DROP_PARTITION: - SentryJSONDropPartitionMessage dropPartitionMessage = - deserializer.getDropPartitionMessage(event.getMessage()); - dbName = dropPartitionMessage.getDB(); - tableName = dropPartitionMessage.getTable(); - locations = dropPartitionMessage.getLocations(); - if ((dbName == null) || (tableName == null) || (locations == null)) { - throw new SentryInvalidHMSEventException(String.format("Drop partition event " + - "has incomplete information. dbName = %s, tableName = %s, location = %s", + break; + } + notificationProcessor.processAddPartition(dbName, tableName, locations, event.getEventId()); + break; + case DROP_PARTITION: + SentryJSONDropPartitionMessage dropPartitionMessage = + deserializer.getDropPartitionMessage(event.getMessage()); + dbName = dropPartitionMessage.getDB(); + tableName = dropPartitionMessage.getTable(); + locations = dropPartitionMessage.getLocations(); + if ((dbName == null) || (tableName == null) || (locations == null)) { + isNotificationProcessingSkipped = true; + LOGGER.error(String.format("Drop partition event " + + "has incomplete information. dbName = %s, tableName = %s, location = %s", StringUtils.defaultIfBlank(dbName, "null"), StringUtils.defaultIfBlank(tableName, "null"), locations != null ? locations.toString() : "null")); - } - notificationProcessor.processDropPartition(dbName, tableName, locations, - event.getEventId()); - break; - case ALTER_PARTITION: - SentryJSONAlterPartitionMessage alterPartitionMessage = + break; + } + notificationProcessor.processDropPartition(dbName, tableName, locations, event.getEventId()); + + break; + case ALTER_PARTITION: + SentryJSONAlterPartitionMessage alterPartitionMessage = deserializer.getAlterPartitionMessage(event.getMessage()); - dbName = alterPartitionMessage.getDB(); - tableName = alterPartitionMessage.getTable(); - oldLocation = alterPartitionMessage.getOldLocation(); - newLocation = alterPartitionMessage.getNewLocation(); - - if ((dbName == null) || - (tableName == null) || - (oldLocation == null) || - (newLocation == null)) { - throw new SentryInvalidHMSEventException(String.format("Alter partition event " + - "has incomplete information. dbName = %s, tableName = %s, " + - "oldLocation = %s, newLocation = %s", - StringUtils.defaultIfBlank(dbName, "null"), - StringUtils.defaultIfBlank(tableName, "null"), - StringUtils.defaultIfBlank(oldLocation, "null"), - StringUtils.defaultIfBlank(newLocation, "null"))); - } + dbName = alterPartitionMessage.getDB(); + tableName = alterPartitionMessage.getTable(); + oldLocation = alterPartitionMessage.getOldLocation(); + newLocation = alterPartitionMessage.getNewLocation(); + + if ((dbName == null) || + (tableName == null) || + (oldLocation == null) || + (newLocation == null)) { + isNotificationProcessingSkipped = true; + LOGGER.error(String.format("Alter partition event " + + "has incomplete information. dbName = %s, tableName = %s, " + + "oldLocation = %s, newLocation = %s", + StringUtils.defaultIfBlank(dbName, "null"), + StringUtils.defaultIfBlank(tableName, "null"), + StringUtils.defaultIfBlank(oldLocation, "null"), + StringUtils.defaultIfBlank(newLocation, "null"))); + break; + } else if (oldLocation == newLocation) { + isNotificationProcessingSkipped = true; + LOGGER.info(String.format("Alter partition notification ignored as" + + "location has not changed: AuthzObj = %s, Location = %s", dbName + "." + + "." + tableName, oldLocation)); + break; + } - notificationProcessor.processAlterPartition(dbName, tableName, oldLocation, - newLocation, event.getEventId()); - break; - case INSERT: - // TODO DO we need to do anything here? - break; + notificationProcessor.processAlterPartition(dbName, tableName, oldLocation, + newLocation, event.getEventId()); + break; + case INSERT: + // TODO DO we need to do anything here? + break; + } + } catch (Exception e) { + if (e.getCause() instanceof JDODataStoreException) { + LOGGER.info("Received JDO Storage Exception, Could be because of processing " + + "duplicate notification"); + if (event.getEventId() <= sentryStore.getLastProcessedNotificationID()) { + // Rest of the notifications need not be processed. + throw e; + } + } + sentryStore.persistLastProcessedNotificationID(event.getEventId()); + } + if (isNotificationProcessingSkipped) { + // Update the notification ID in the persistent store even when the notification is + // not processed as the content in in the notification is not valid. + // Continue processing the next notification. + sentryStore.persistLastProcessedNotificationID(event.getEventId()); + isNotificationProcessingSkipped = false; } - currentEventID = event.getEventId(); // Wake up any HMS waiters that are waiting for this ID. - wakeUpWaitingClientsForSync(currentEventID); + wakeUpWaitingClientsForSync(event.getEventId()); } } @@ -606,7 +666,7 @@ public class HMSFollower implements Runnable, AutoCloseable { LOGGER.info("Drop Sentry privilege ignored as there are no privileges on the database: %s", dbName); } catch (Exception e) { throw new SentryInvalidInputException("Could not process Drop database event." + - "Event: " + event.toString(), e); + "Event: " + event.toString(), e); } } @@ -624,7 +684,7 @@ public class HMSFollower implements Runnable, AutoCloseable { } private void renamePrivileges(String oldDbName, String oldTableName, String newDbName, String newTableName) throws - Exception { + Exception { TSentryAuthorizable oldAuthorizable = new TSentryAuthorizable(hiveInstance); oldAuthorizable.setDb(oldDbName); oldAuthorizable.setTable(oldTableName); @@ -632,7 +692,7 @@ public class HMSFollower implements Runnable, AutoCloseable { newAuthorizable.setDb(newDbName); newAuthorizable.setTable(newTableName); Update update = - onRenameSentryPrivilege(oldAuthorizable, newAuthorizable); + onRenameSentryPrivilege(oldAuthorizable, newAuthorizable); sentryStore.renamePrivilege(oldAuthorizable, newAuthorizable, update); } @@ -646,8 +706,8 @@ public class HMSFollower implements Runnable, AutoCloseable { @VisibleForTesting static Update onRenameSentryPrivilege(TSentryAuthorizable oldAuthorizable, - TSentryAuthorizable newAuthorizable) - throws SentryPolicyStorePlugin.SentryPluginException { + TSentryAuthorizable newAuthorizable) + throws SentryPolicyStorePlugin.SentryPluginException { String oldAuthz = getAuthzObj(oldAuthorizable); String newAuthz = getAuthzObj(newAuthorizable); PermissionsUpdate update = new PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false); http://git-wip-us.apache.org/repos/asf/sentry/blob/81eed11a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java index 9f4cfe8..de8e2f7 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java @@ -32,6 +32,10 @@ import java.util.List; * NotificationProcessor processes various notification events generated from * the Hive MetaStore state change, and applies these changes on the complete * HMS Paths snapshot or delta update stored in Sentry using SentryStore. + * <p> + * NotificationProcessor should not skip processing notification events for any reason. + * If some notification events are to be skipped, appropriate logic should be added in + * HMSFollower before invoking NotificationProcessor. */ class NotificationProcessor { @@ -354,10 +358,6 @@ class NotificationProcessor { Collections.singleton(newPathTree), update); } - } else { - LOGGER.info(String.format("Alter table notification ignored as neither name nor " + - "location has changed: oldAuthzObj = %s, oldLocation = %s, newAuthzObj = %s, " + - "newLocation = %s", oldAuthzObj, oldLocation, newAuthzObj, newLocation)); } } http://git-wip-us.apache.org/repos/asf/sentry/blob/81eed11a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java index e7443eb..440b0e9 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java @@ -75,6 +75,8 @@ import org.slf4j.LoggerFactory; import static org.apache.sentry.provider.db.service.persistent.QueryParamBuilder.newQueryParamBuilder; +import javax.jdo.JDODataStoreException; + public class TestSentryStore extends org.junit.Assert { private static final Logger LOGGER = LoggerFactory.getLogger(TestSentryStore.class); @@ -2460,11 +2462,13 @@ public class TestSentryStore extends org.junit.Assert { @Test public void testAddDeleteAuthzPathsMapping() throws Exception { // Add "db1.table1" authzObj - PathsUpdate addUpdate = new PathsUpdate(0, false); + Long lastNotificationId = sentryStore.getLastProcessedNotificationID(); + PathsUpdate addUpdate = new PathsUpdate(1, false); addUpdate.newPathChange("db1.table"). addToAddPaths(Arrays.asList("db1", "tbl1")); addUpdate.newPathChange("db1.table"). addToAddPaths(Arrays.asList("db1", "tbl2")); + sentryStore.addAuthzPathsMapping("db1.table", Sets.newHashSet("db1/tbl1", "db1/tbl2"), addUpdate); PathsImage pathsImage = sentryStore.retrieveFullPathsImage(); @@ -2477,9 +2481,11 @@ public class TestSentryStore extends org.junit.Assert { long lastChangeID = sentryStore.getLastProcessedPathChangeID(); MSentryPathChange addPathChange = sentryStore.getMSentryPathChangeByID(lastChangeID); assertEquals(addUpdate.JSONSerialize(), addPathChange.getPathChange()); + lastNotificationId = sentryStore.getLastProcessedNotificationID(); + assertEquals(1, lastNotificationId.longValue()); // Delete path 'db1.db/tbl1' from "db1.table1" authzObj. - PathsUpdate delUpdate = new PathsUpdate(1, false); + PathsUpdate delUpdate = new PathsUpdate(2, false); delUpdate.newPathChange("db1.table") .addToDelPaths(Arrays.asList("db1", "tbl1")); sentryStore.deleteAuthzPathsMapping("db1.table", Sets.newHashSet("db1/tbl1"), delUpdate); @@ -2492,9 +2498,11 @@ public class TestSentryStore extends org.junit.Assert { lastChangeID = sentryStore.getLastProcessedPathChangeID(); MSentryPathChange delPathChange = sentryStore.getMSentryPathChangeByID(lastChangeID); assertEquals(delUpdate.JSONSerialize(), delPathChange.getPathChange()); + lastNotificationId = sentryStore.getLastProcessedNotificationID(); + assertEquals(2, lastNotificationId.longValue()); // Delete "db1.table" authzObj from the authzObj -> [Paths] mapping. - PathsUpdate delAllupdate = new PathsUpdate(2, false); + PathsUpdate delAllupdate = new PathsUpdate(3, false); delAllupdate.newPathChange("db1.table") .addToDelPaths(Lists.newArrayList(PathsUpdate.ALL_PATHS)); sentryStore.deleteAllAuthzPathsMapping("db1.table", delAllupdate); @@ -2506,11 +2514,16 @@ public class TestSentryStore extends org.junit.Assert { lastChangeID = sentryStore.getLastProcessedPathChangeID(); MSentryPathChange delAllPathChange = sentryStore.getMSentryPathChangeByID(lastChangeID); assertEquals(delAllupdate.JSONSerialize(), delAllPathChange.getPathChange()); + + lastNotificationId = sentryStore.getLastProcessedNotificationID(); + assertEquals(3, lastNotificationId.longValue()); + } @Test public void testRenameUpdateAuthzPathsMapping() throws Exception { Map<String, Set<String>> authzPaths = new HashMap<>(); + Long lastNotificationId = sentryStore.getLastProcessedNotificationID(); authzPaths.put("db1.table1", Sets.newHashSet("user/hive/warehouse/db1.db/table1", "user/hive/warehouse/db1.db/table1/p1")); authzPaths.put("db1.table2", Sets.newHashSet("user/hive/warehouse/db1.db/table2")); @@ -2518,8 +2531,9 @@ public class TestSentryStore extends org.junit.Assert { Map<String, Set<String>> pathsImage = sentryStore.retrieveFullPathsImage().getPathImage(); assertEquals(2, pathsImage.size()); + // Rename path of 'db1.table1' from 'db1.table1' to 'db1.newTable1' - PathsUpdate renameUpdate = new PathsUpdate(0, false); + PathsUpdate renameUpdate = new PathsUpdate(1, false); renameUpdate.newPathChange("db1.table1") .addToDelPaths(Arrays.asList("user", "hive", "warehouse", "db1.db", "table1")); renameUpdate.newPathChange("db1.newTable1") @@ -2538,9 +2552,10 @@ public class TestSentryStore extends org.junit.Assert { long lastChangeID = sentryStore.getLastProcessedPathChangeID(); MSentryPathChange renamePathChange = sentryStore.getMSentryPathChangeByID(lastChangeID); assertEquals(renameUpdate.JSONSerialize(), renamePathChange.getPathChange()); - + lastNotificationId = sentryStore.getLastProcessedNotificationID(); + assertEquals(1, lastNotificationId.longValue()); // Rename 'db1.table1' to "db1.table2" but did not change its location. - renameUpdate = new PathsUpdate(1, false); + renameUpdate = new PathsUpdate(2, false); renameUpdate.newPathChange("db1.newTable1") .addToDelPaths(Arrays.asList("user", "hive", "warehouse", "db1.db", "newTable1")); renameUpdate.newPathChange("db1.newTable2") @@ -2553,6 +2568,8 @@ public class TestSentryStore extends org.junit.Assert { assertEquals(Sets.newHashSet("user/hive/warehouse/db1.db/table1/p1", "user/hive/warehouse/db1.db/newTable1"), pathsImage.get("db1.newTable2")); + lastNotificationId = sentryStore.getLastProcessedNotificationID(); + assertEquals(2, lastNotificationId.longValue()); // Query the persisted path change and ensure it equals to the original one lastChangeID = sentryStore.getLastProcessedPathChangeID(); @@ -2581,6 +2598,8 @@ public class TestSentryStore extends org.junit.Assert { lastChangeID = sentryStore.getLastProcessedPathChangeID(); MSentryPathChange updatePathChange = sentryStore.getMSentryPathChangeByID(lastChangeID); assertEquals(update.JSONSerialize(), updatePathChange.getPathChange()); + lastNotificationId = sentryStore.getLastProcessedNotificationID(); + assertEquals(3, lastNotificationId.longValue()); } @Test @@ -2964,4 +2983,54 @@ public class TestSentryStore extends org.junit.Assert { prevId = changeId; } } + + @Test + public void testDuplicateNotification() throws Exception { + Map<String, Set<String>> authzPaths = new HashMap<>(); + Long lastNotificationId = sentryStore.getLastProcessedNotificationID(); + authzPaths.put("db1.table1", Sets.newHashSet("user/hive/warehouse/db1.db/table1", + "user/hive/warehouse/db1.db/table1/p1")); + authzPaths.put("db1.table2", Sets.newHashSet("user/hive/warehouse/db1.db/table2")); + sentryStore.persistFullPathsImage(authzPaths); + Map<String, Set<String>> pathsImage = sentryStore.retrieveFullPathsImage().getPathImage(); + assertEquals(2, pathsImage.size()); + + if (lastNotificationId == null) { + lastNotificationId = SentryStore.EMPTY_NOTIFICATION_ID; + } + + // Rename path of 'db1.table1' from 'db1.table1' to 'db1.newTable1' + PathsUpdate renameUpdate = new PathsUpdate(1, false); + renameUpdate.newPathChange("db1.table1") + .addToDelPaths(Arrays.asList("user", "hive", "warehouse", "db1.db", "table1")); + renameUpdate.newPathChange("db1.newTable1") + .addToAddPaths(Arrays.asList("user", "hive", "warehouse", "db1.db", "newTable1")); + sentryStore.renameAuthzPathsMapping("db1.table1", "db1.newTable1", + "user/hive/warehouse/db1.db/table1", "user/hive/warehouse/db1.db/newTable1", renameUpdate); + pathsImage = sentryStore.retrieveFullPathsImage().getPathImage(); + assertEquals(2, pathsImage.size()); + assertEquals(3, sentryStore.getMPaths().size()); + assertTrue(pathsImage.containsKey("db1.newTable1")); + assertEquals(Sets.newHashSet("user/hive/warehouse/db1.db/table1/p1", + "user/hive/warehouse/db1.db/newTable1"), + pathsImage.get("db1.newTable1")); + + // Query the persisted path change and ensure it equals to the original one + long lastChangeID = sentryStore.getLastProcessedPathChangeID(); + MSentryPathChange renamePathChange = sentryStore.getMSentryPathChangeByID(lastChangeID); + assertEquals(renameUpdate.JSONSerialize(), renamePathChange.getPathChange()); + lastNotificationId = sentryStore.getLastProcessedNotificationID(); + assertEquals(1, lastNotificationId.longValue()); + + + // Process the notificaiton second time + try { + sentryStore.renameAuthzPathsMapping("db1.table1", "db1.newTable1", + "user/hive/warehouse/db1.db/table1", "user/hive/warehouse/db1.db/newTable1", renameUpdate); + } catch (Exception e) { + if (!(e.getCause() instanceof JDODataStoreException)) { + fail("Unexpected failure occured while processing duplicate notification"); + } + } + } }
