Repository: sentry Updated Branches: refs/heads/sentry-ha-redesign 1c6ba5ebe -> aecf3f281
SENTRY-1601 Implement HMS Notification barrier on the server side (Alex Kolbasov, Reviewed by: Misha Dmitriev, Vadim Spector) Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/aecf3f28 Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/aecf3f28 Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/aecf3f28 Branch: refs/heads/sentry-ha-redesign Commit: aecf3f281e4c87feb4063d87e02373f4ae747a1b Parents: 1c6ba5e Author: Alexander Kolbasov <[email protected]> Authored: Fri Mar 3 21:54:55 2017 -0800 Committer: Alexander Kolbasov <[email protected]> Committed: Fri Mar 3 21:54:55 2017 -0800 ---------------------------------------------------------------------- .../db/service/persistent/SentryStore.java | 28 ++ .../db/service/thrift/SentryMetrics.java | 1 + .../thrift/SentryPolicyStoreProcessor.java | 17 +- .../sentry/service/thrift/CounterWait.java | 266 +++++++++++++++++++ .../sentry/service/thrift/HMSFollower.java | 9 +- .../sentry/service/thrift/TestCounterWait.java | 90 +++++++ 6 files changed, 407 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/aecf3f28/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java index c1186ba..38f68cd 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java @@ -74,6 +74,7 @@ import org.apache.sentry.provider.db.service.thrift.TSentryMappingData; import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege; import org.apache.sentry.provider.db.service.thrift.TSentryPrivilegeMap; import org.apache.sentry.provider.db.service.thrift.TSentryRole; +import org.apache.sentry.service.thrift.CounterWait; import org.apache.sentry.service.thrift.ServiceConstants.PrivilegeScope; import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig; import org.datanucleus.store.rdbms.exceptions.MissingTableException; @@ -143,6 +144,17 @@ public class SentryStore { private Thread privCleanerThread = null; private final TransactionManager tm; + /** + * counterWait is used to synchronize notifications between Thrift and HMSFollower. + * Technically it doesn't belong here, but the only thing that connects HMSFollower + * and Thrift API is SentryStore. An alternative could be a singleton CounterWait or + * some factory that returns CounterWait instances keyed by name, but this complicates + * things unnecessary. + * <p> + * Keeping it here isn't ideal but serves the purpose until we find a better home. + */ + private final CounterWait counterWait = new CounterWait(); + public static Properties getDataNucleusProperties(Configuration conf) throws SentrySiteConfigurationException, IOException { Properties prop = new Properties(); @@ -237,6 +249,10 @@ public class SentryStore { return tm; } + public CounterWait getCounterWait() { + return counterWait; + } + // ensure that the backend DB schema is set void verifySentryStoreSchema(boolean checkVersion) throws Exception { if (!checkVersion) { @@ -420,6 +436,18 @@ public class SentryStore { } /** + * @return number of threads waiting for HMS notifications to be processed + */ + public Gauge<Integer> getHMSWaitersCountGauge() { + return new Gauge<Integer>() { + @Override + public Integer getValue() { + return counterWait.waitersCount(); + } + }; + } + + /** * Lets the test code know how many privs are in the db, so that we know * if they are in fact being cleaned up when not being referenced any more. * @return The number of rows in the db priv table. http://git-wip-us.apache.org/repos/asf/sentry/blob/aecf3f28/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryMetrics.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryMetrics.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryMetrics.java index 3f7542c..a359a04 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryMetrics.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryMetrics.java @@ -121,6 +121,7 @@ public final class SentryMetrics { addGauge(SentryStore.class, "privilege_count", sentryStore.getPrivilegeCountGauge()); addGauge(SentryStore.class, "group_count", sentryStore.getGroupCountGauge()); + addGauge(SentryStore.class, "hms.waiters", sentryStore.getHMSWaitersCountGauge()); gaugesAdded = true; } } http://git-wip-us.apache.org/repos/asf/sentry/blob/aecf3f28/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java index 30e91ae..2ba3d38 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java @@ -60,6 +60,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.codahale.metrics.Timer; +import static com.codahale.metrics.MetricRegistry.name; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; @@ -82,7 +84,10 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface { private final SentryStore sentryStore; private final NotificationHandlerInvoker notificationHandlerInvoker; private final ImmutableSet<String> adminGroups; - SentryMetrics sentryMetrics; + private SentryMetrics sentryMetrics; + private final Timer hmsWaitTimer = + SentryMetrics.getInstance(). + getTimer(name(SentryPolicyStoreProcessor.class, "hms", "wait")); private List<SentryPolicyStorePlugin> sentryPlugins = new LinkedList<SentryPolicyStorePlugin>(); @@ -1142,7 +1147,13 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface { } @Override - public TSentrySyncIDResponse sentry_sync_notifications(TSentrySyncIDRequest request) throws TException { - throw new UnsupportedOperationException("sentry_sync_notifications"); + public TSentrySyncIDResponse sentry_sync_notifications(TSentrySyncIDRequest request) + throws TException { + try (Timer.Context timerContext = hmsWaitTimer.time()) { + // Wait until Sentry Server processes specified HMS Notification ID. + TSentrySyncIDResponse response = new TSentrySyncIDResponse(); + response.setId(sentryStore.getCounterWait().waitFor(request.getId())); + return response; + } } } http://git-wip-us.apache.org/repos/asf/sentry/blob/aecf3f28/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java new file mode 100644 index 0000000..f593bff --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.service.thrift; + +import org.apache.http.annotation.ThreadSafe; + +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Waiting for counter to reach certain value. + * The counter starts from zero and its value increases over time. + * The class allows for multiple consumers waiting until the value of the + * counter reaches some value interesting to them. + * Consumers call {@link #waitFor(long)} which may either return + * immediately if the counter reached the specified value, or block + * until this value is reached. + * <p> + * All waiters should be waken up when the counter becomes equal or higher + * then the value they are waiting for. + * <p> + * The counter is updated by a single updater that should only increase the + * counter value. + * The updater calls the {@link #update(long)} method to update the counter + * value and this should wake up all threads waiting for any value smaller or + * equal to the new one. + * <p> + * The class is thread-safe. + * It is designed for use by multiple waiter threads and a single + * updater thread, but it will work correctly even in the presence of multiple + * updater threads. + */ +@ThreadSafe +public final class CounterWait { + // Implementation notes. + // + // The implementation is based on: + // + // 1) Using an atomic counter value which guarantees consistency. + // Since everyone needs only to know when the counter value reached the + // certain value and the counter may only increase its value, + // it is safe to update the counter by another thread after its value + // was read. + // + // 2) Priority queue of waiters, sorted by their expected values. The smallest + // value is always at the top of the queue. The priority queue itself + // is thread-safe, so no locks are needed to protect access to it. + // + // Each waiter is implemented using a binary semaphore. + // This solves the problem of a wakeup that happens before the sleep - + // in this case the acquire() doesn't block and returns immediately. + // + // NOTE: We use PriorityBlockingQueue for waiters because it is thread-safe, + // we are not using its blocking queue semantics. + + /** Counter value. May only increase. */ + private final AtomicLong currentId = new AtomicLong(0); + + /** + * Waiters sorted by the value of the counter they are waiting for. + * Note that {@link PriorityBlockingQueue} is thread-safe. + * We are not using this as a blocking queue, but as a synchronized + * PriorityQueue. + */ + private final PriorityBlockingQueue<ValueEvent> waiters = + new PriorityBlockingQueue<>(); + + /** + * Update the counter value and wake up all threads waiting for this + * value or any value below it. + * <p> + * The counter value should only increase. + * An attempt to decrease the value is raising + * {@link IllegalArgumentException}. + * The usual case is to have a single updater thread, but we enforce this + * by synchronizing the call. + * + * @param newValue the new counter value + */ + public synchronized void update(long newValue) { + // Make sure the counter is never decremented + if (newValue < currentId.get()) { + throw new IllegalArgumentException("new counter value " + + String.valueOf(newValue) + + "is smaller then the previous one " + currentId); + } + currentId.set(newValue); + + // Wake up any threads waiting for a counter to reach this value. + wakeup(newValue); + } + + + /** + * Wait for specified counter value. + * Returns immediately if the value is reached or blocks until the value + * is reached. + * Multiple threads can call the method concurrently. + * + * @param value requested counter value + * @return current counter value that should be no smaller then the requested + * value + */ + public long waitFor(long value) { + // Fast path - counter value already reached, no need to block + if (value <= currentId.get()) { + return currentId.get(); + } + + // Enqueue the waiter for this value + ValueEvent eid = new ValueEvent(value); + waiters.put(eid); + + // It is possible that between the fast path check and the time the + // value event is enqueued, the counter value already reached the requested + // value. In this case we return immediately. + if (value <= currentId.get()) { + return currentId.get(); + } + + // At this point we may be sure that by the time the event was enqueued, + // the counter was below the requested value. This means that update() + // is guaranteed to wake us up when the counter reaches the requested value. + // The wake up may actually happen before we start waiting, in this case + // the event's blocking queue will be non-empty and the waitFor() below + // will not block, so it is safe to wake up before the wait. + // So sit tight and wait patiently. + eid.waitFor(); + return currentId.get(); + } + + /** + * Wake up any threads waiting for a counter to reach specified value + * Peek at the top of the queue. If the queue is empty or the top value + * exceeds the current value, we are done. Otherwise wakeup the top thread, + * remove the corresponding waiter and continue. + * <p> + * Note that the waiter may be removed under our nose by + * {@link #waitFor(long)} method, but this is Ok - in this case + * waiters.remove() will just return false. + * + * @param value current counter value + */ + private void wakeup(long value) { + while (true) { + // Get the top of the waiters queue or null if it is empty + ValueEvent e = waiters.poll(); + if (e == null) { + // Queue is empty - return. + return; + } + // No one to wake up, return event to the queue and exit + if (e.getValue() > value) { + waiters.add(e); + return; + } + // Due for wake-up call + e.wakeup(); + } + } + + // Useful for debugging + @Override + public String toString() { + return "CounterWait{" + "currentId=" + currentId + + ", waiters=" + waiters + "}"; + } + + /** + * Return number of waiters. This is mostly useful for metrics/debugging + * + * @return number of sleeping waiters + */ + public int waitersCount() { + return waiters.size(); + } + + /** + * Representation of the waiting event. + * The waiting event consists of the expected value and a binary semaphore. + * <p> + * Each thread waiting for the given value, creates a ValueEvent and tries + * to acquire a semaphore. This blocks until the semaphore is released. + * <p> + * ValueEvents are stored in priority queue sorted by value, so they should be + * comparable by the value. + */ + private static class ValueEvent implements Comparable<ValueEvent> { + /** Value waited for. */ + private final long value; + /** Binary semaphore to synchronize waiters */ + private final Semaphore semaphore = new Semaphore(1); + + /** + * Instantiates a new Value event. + * + * @param v the expected value + */ + ValueEvent(long v) { + this.value = v; + // Acquire the semaphore. Subsequent calls to waitFor() will block until + // wakeup() releases the semaphore. + semaphore.acquireUninterruptibly(); // Will not block + } + + /** Wait until signaled. May return immediately if already signalled. */ + void waitFor() { + semaphore.acquireUninterruptibly(); + } + + /** @return the value we are waiting for */ + long getValue() { + return value; + } + + /** Wakeup the waiting thread. */ + void wakeup() { + semaphore.release(); + } + + /** + * Compare objects by value + */ + @Override + public int compareTo(final ValueEvent o) { + return value == o.value ? 0 + : value < o.value ? -1 + : 1; + } + + /** + * Use identity comparison of objects + */ + @Override + public boolean equals(final Object o) { + return (this == o); + } + + @Override + public int hashCode() { + return (int) (value ^ (value >>> 32)); + } + + @Override + public String toString() { + return String.valueOf(value); + } + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/aecf3f28/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java index 8b07f5b..f3f51da 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java @@ -345,6 +345,7 @@ public class HMSFollower implements Runnable { void processNotificationEvents(List<NotificationEvent> events) throws SentryInvalidHMSEventException, SentryInvalidInputException { SentryJSONMessageDeserializer deserializer = new SentryJSONMessageDeserializer(); + final CounterWait counterWait = sentryStore.getCounterWait(); for (NotificationEvent event : events) { String dbName, tableName, oldLocation, newLocation, location; @@ -479,7 +480,13 @@ public class HMSFollower implements Runnable { //TODO: Handle HDFS plugin break; } - currentEventID = event.getEventId(); + currentEventID = event.getEventId(); + // Wake up any HMS waiters that are waiting for this ID. + // counterWait should never be null, but tests mock SentryStore and a mocked one + // doesn't have it. + if (counterWait != null) { + counterWait.update(currentEventID); + } } } http://git-wip-us.apache.org/repos/asf/sentry/blob/aecf3f28/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java new file mode 100644 index 0000000..a700178 --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.service.thrift; + +import junit.framework.TestCase; + +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; + +/** + * Test for CounterWait class + */ +public class TestCounterWait extends TestCase { + // Used to verify that wakeups happen in the right order + private BlockingDeque<Long> outSyncQueue = new LinkedBlockingDeque<>(); + // Number of waiters to test for + private int nthreads = 20; + + public void testWaitFor() throws Exception { + // Create a thread for each waiter + ExecutorService executor = Executors.newFixedThreadPool(nthreads); + + final CounterWait waiter = new CounterWait(); + + // Initial value is zero, so this shouldn't block + assertEquals(0, waiter.waitFor(0)); + + // Create a pair of threads waiting for each value in [1, nthreads / 2] + // We use pair of threads per value to verify that both are waken up + for (int i = 0; i < nthreads; i++) { + final int finalI = i + 2; + final int val = finalI / 2; + executor.execute(new Runnable() { + public void run() { + long r = waiter.waitFor(val); // blocks + outSyncQueue.add(r); // Once we wake up, post result + } + } + ); + } + + // Wait until all threads are asleep. + while(waiter.waitersCount() < nthreads) { + sleep(20); + } + + // All threads should be blocked, so outSyncQueue should be empty + assertTrue(outSyncQueue.isEmpty()); + + // Post a counter update for each value in [ 1, nthreads / 2 ] + // After eac update two threads should be waken up and the corresponding pair of + // values should appear in the outSyncQueue. + for (int i = 0; i < nthreads / 2; i++) { + waiter.update(i + 1); + long r = outSyncQueue.takeFirst(); + assertEquals(r, i + 1); + r = outSyncQueue.takeFirst(); + assertEquals(r, i + 1); + assertTrue(outSyncQueue.isEmpty()); + } + + // We are done + executor.shutdown(); + } + + private void sleep(long ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + } + } +}
