Repository: sentry Updated Branches: refs/heads/master 1275be7e3 -> c302da781
SENTRY-1929: When full HMS snapshot is created all higher notifications should be purged (Alex Kolbasov, reviewed by Vamsee Yarlaga and Na Li) Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/c302da78 Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/c302da78 Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/c302da78 Branch: refs/heads/master Commit: c302da78189790d48191856cfaee2db6fcf7e9c7 Parents: 1275be7 Author: Alexander Kolbasov <[email protected]> Authored: Thu Sep 7 17:00:50 2017 -0700 Committer: Alexander Kolbasov <[email protected]> Committed: Thu Sep 7 17:09:35 2017 -0700 ---------------------------------------------------------------------- .../db/service/persistent/SentryStore.java | 53 ++++++++++++++++++-- .../sentry/service/thrift/CounterWait.java | 22 ++++++++ .../sentry/service/thrift/HMSFollower.java | 19 ++++++- .../sentry/service/thrift/TestHMSFollower.java | 2 +- 4 files changed, 89 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/c302da78/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java index 1ef7dcc..a70a552 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java @@ -603,8 +603,10 @@ public class SentryStore { query.setFilter("changeID <= maxChangedIdDeleted"); query.declareParameters("long maxChangedIdDeleted"); long numDeleted = query.deletePersistentAll(maxIDDeleted); - LOGGER.info(String.format("Purged %d of %s to changeID=%d", - numDeleted, cls.getSimpleName(), maxIDDeleted)); + if (numDeleted > 0) { + LOGGER.info(String.format("Purged %d of %s to changeID=%d", + numDeleted, cls.getSimpleName(), maxIDDeleted)); + } } /** @@ -627,7 +629,9 @@ public class SentryStore { query.setFilter("notificationId <= maxNotificationIdDeleted"); query.declareParameters("long maxNotificationIdDeleted"); long numDeleted = query.deletePersistentAll(lastNotificationID - changesToKeep); - LOGGER.info("Purged {} of {}", numDeleted, MSentryHmsNotification.class.getSimpleName()); + if (numDeleted > 0) { + LOGGER.info("Purged {} of {}", numDeleted, MSentryHmsNotification.class.getSimpleName()); + } } /** @@ -2765,6 +2769,28 @@ public class SentryStore { } /** + * Delete all stored HMS notifications starting from given ID.<p> + * + * The purpose of the function is to clean up notifications in cases + * were we recover from HMS notifications resets. + * + * @param pm Persistent manager instance + * @param id initial ID. All notifications starting from this ID and above are + * removed. + */ + private void deleteNotificationsSince(PersistenceManager pm, long id) { + Query query = pm.newQuery(MSentryHmsNotification.class); + query.addExtension(LOAD_RESULTS_AT_COMMIT, "false"); + query.setFilter("notificationId >= currentNotificationId"); + query.declareParameters("long currentNotificationId"); + long numDeleted = query.deletePersistentAll(id); + if (numDeleted > 0) { + LOGGER.info("Purged {} notification entries starting from {}", + numDeleted, id); + } + } + + /** * Persist an up-to-date HMS snapshot into Sentry DB in a single transaction with its latest * notification ID * @@ -2778,6 +2804,7 @@ public class SentryStore { new TransactionBlock() { public Object execute(PersistenceManager pm) throws Exception { pm.setDetachAllOnCommit(false); // No need to detach objects + deleteNotificationsSince(pm, notificationID + 1); // persist the notidicationID pm.makePersistent(new MSentryHmsNotification(notificationID)); @@ -3814,17 +3841,33 @@ public class SentryStore { } /** - * Set the notification ID of last processed HMS notification. + * Set the notification ID of last processed HMS notification and remove all + * subsequent notifications stored. */ - public void persistLastProcessedNotificationID(final Long notificationId) throws Exception { + public void setLastProcessedNotificationID(final Long notificationId) throws Exception { LOGGER.debug("Persisting Last Processed Notification ID {}", notificationId); tm.executeTransaction( new TransactionBlock<Object>() { public Object execute(PersistenceManager pm) throws Exception { + deleteNotificationsSince(pm, notificationId + 1); return pm.makePersistent(new MSentryHmsNotification(notificationId)); } }); } + + /** + * Set the notification ID of last processed HMS notification. + */ + public void persistLastProcessedNotificationID(final Long notificationId) throws Exception { + LOGGER.debug("Persisting Last Processed Notification ID {}", notificationId); + tm.executeTransaction( + new TransactionBlock<Object>() { + public Object execute(PersistenceManager pm) throws Exception { + return pm.makePersistent(new MSentryHmsNotification(notificationId)); + } + }); + } + /** * Gets the last processed change ID for perm delta changes. * http://git-wip-us.apache.org/repos/asf/sentry/blob/c302da78/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java index 9c9bb69..2c9e87a 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java @@ -117,6 +117,28 @@ public final class CounterWait { wakeup(newValue); } + /** + * Explicitly reset the counter value to a new value, but allow setting to a + * smaller value. + * This should be used when we have some external event that resets the counter + * value space. + * @param newValue New counter value. If this is greater or equal then the current + * value, this is equivalent to {@link #update(long)}. Otherwise + * sets the counter to the new smaller value. + */ + public synchronized void reset(long newValue) { + long oldValue = currentId.get(); + + if (newValue > oldValue) { + update(newValue); + } else if (newValue < oldValue) { + LOGGER.warn("resetting counter from {} to smaller value {}", + oldValue, newValue); + currentId.set(newValue); + // No need to wakeup waiters since no one should wait on the smaller value + } + } + /** * Wait for specified counter value. http://git-wip-us.apache.org/repos/asf/sentry/blob/c302da78/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java index c234eaf..4d83ad5 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java @@ -294,8 +294,10 @@ public class HMSFollower implements Runnable, AutoCloseable { sentryStore.persistFullPathsImage(snapshotInfo.getPathImage(), snapshotInfo.getId()); } else { // We need to persist latest notificationID for next poll - sentryStore.persistLastProcessedNotificationID(snapshotInfo.getId()); + sentryStore.setLastProcessedNotificationID(snapshotInfo.getId()); } + // Only reset the counter if the above operations succeeded + resetCounterWait(snapshotInfo.getId()); } catch (Exception failure) { LOGGER.error("Received exception while persisting HMS path full snapshot "); throw failure; @@ -378,4 +380,19 @@ public class HMSFollower implements Runnable, AutoCloseable { counterWait.update(eventId); } } + + /** + * Reset CounterWait counter to the new value + * @param eventId new event id value, may be smaller then the old value. + */ + private void resetCounterWait(long eventId) { + CounterWait counterWait = sentryStore.getCounterWait(); + + // Wake up any HMS waiters that are waiting for this ID. + // counterWait should never be null, but tests mock SentryStore and a mocked one + // doesn't have it. + if (counterWait != null) { + counterWait.reset(eventId); + } + } } http://git-wip-us.apache.org/repos/asf/sentry/blob/c302da78/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java index 93afb61..7d64375 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java @@ -821,7 +821,7 @@ public class TestHMSFollower { when(sentryStore.isHmsNotificationEmpty()).thenReturn(true); hmsFollower.run(); verify(sentryStore, times(0)).persistFullPathsImage(fullSnapshot.getPathImage(), fullSnapshot.getId()); - verify(sentryStore, times(1)).persistLastProcessedNotificationID(fullSnapshot.getId()); + verify(sentryStore, times(1)).setLastProcessedNotificationID(fullSnapshot.getId()); verify(sentryStore, times(1)).isHmsNotificationEmpty(); verify(sentryStore, times(0)).isAuthzPathsMappingEmpty(); }
