Repository: sentry Updated Branches: refs/heads/master bbf5ce1fd -> b38267765
SENTRY-1940: Sentry should time out threads waiting for notifications (Alex Kolbasov, reviewd by Vamsee Yarlagadda and Sergio Pena) Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/b3826776 Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/b3826776 Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/b3826776 Branch: refs/heads/master Commit: b38267765be842ca30b0bfa86f1e47bce6fdd313 Parents: bbf5ce1 Author: Alexander Kolbasov <[email protected]> Authored: Mon Sep 11 16:24:19 2017 -0700 Committer: Alexander Kolbasov <[email protected]> Committed: Mon Sep 11 16:24:40 2017 -0700 ---------------------------------------------------------------------- .../db/service/persistent/SentryStore.java | 6 ++- .../thrift/SentryPolicyStoreProcessor.java | 6 +++ .../sentry/service/thrift/CounterWait.java | 57 ++++++++++++++++---- .../sentry/service/thrift/ServiceConstants.java | 6 +++ .../sentry/service/thrift/TestCounterWait.java | 11 +++- 5 files changed, 75 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/b3826776/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java index a70a552..01a7c83 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.TimeUnit; import javax.jdo.FetchGroup; import javax.jdo.JDODataStoreException; @@ -187,7 +188,7 @@ public class SentryStore { * <p> * Keeping it here isn't ideal but serves the purpose until we find a better home. */ - private final CounterWait counterWait = new CounterWait(); + private final CounterWait counterWait; public static Properties getDataNucleusProperties(Configuration conf) throws SentrySiteConfigurationException, IOException { @@ -267,6 +268,9 @@ public class SentryStore { pmf = JDOHelper.getPersistenceManagerFactory(prop); tm = new TransactionManager(pmf, conf); verifySentryStoreSchema(checkSchemaVersion); + long notificationTimeout = conf.getInt(ServerConfig.SENTRY_NOTIFICATION_SYNC_TIMEOUT_MS, + ServerConfig.SENTRY_NOTIFICATION_SYNC_TIMEOUT_DEFAULT); + counterWait = new CounterWait(notificationTimeout, TimeUnit.MILLISECONDS); } public void setPersistUpdateDeltas(boolean persistUpdateDeltas) { http://git-wip-us.apache.org/repos/asf/sentry/blob/b3826776/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java index cfd0e30..cd85400 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java @@ -26,6 +26,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeoutException; import java.util.regex.Pattern; import org.apache.commons.lang.StringUtils; @@ -1162,6 +1163,11 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface { request.getId()); LOGGER.error(msg, e); response.setStatus(Status.RuntimeError(msg, e)); + Thread.currentThread().interrupt(); + } catch (TimeoutException e) { + String msg = String.format("timeod out wait request for id %d", request.getId()); + LOGGER.warn(msg, e); + response.setStatus(Status.RuntimeError(msg, e)); } return response; } http://git-wip-us.apache.org/repos/asf/sentry/blob/b3826776/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java index 2c9e87a..2268ce7 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java @@ -24,6 +24,8 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; /** @@ -33,7 +35,9 @@ import java.util.concurrent.atomic.AtomicLong; * counter reaches some value interesting to them. * Consumers call {@link #waitFor(long)} which may either return * immediately if the counter reached the specified value, or block - * until this value is reached. + * until this value is reached. Consumers can also specify timeout for the + * {@link #waitFor(long)} in which case it may return {@link TimeoutException} + * when the wait was not successfull within the specified time limit. * <p> * All waiters should be waken up when the counter becomes equal or higher * then the value they are waiting for. @@ -77,6 +81,9 @@ public final class CounterWait { /** Counter value. May only increase. */ private final AtomicLong currentId = new AtomicLong(0); + private final long waitTimeout; + private final TimeUnit waitTimeUnit; + /** * Waiters sorted by the value of the counter they are waiting for. * Note that {@link PriorityBlockingQueue} is thread-safe. @@ -87,6 +94,31 @@ public final class CounterWait { new PriorityBlockingQueue<>(); /** + * Create an instance of CounterWait object that will not timeout during wait + */ + public CounterWait() { + this(0, TimeUnit.SECONDS); + } + + /** + * Create an instance of CounterWait object that will timeout during wait + * @param waitTimeout maximum time in seconds to wait for counter + */ + public CounterWait(long waitTimeoutSec) { + this(waitTimeoutSec, TimeUnit.SECONDS); + } + + /** + * Create an instance of CounterWait object that will timeout during wait + * @param waitTimeout maximum time to wait for counter + * @param waitTimeUnit time units for wait + */ + public CounterWait(long waitTimeout, TimeUnit waitTimeUnit) { + this.waitTimeout = waitTimeout; + this.waitTimeUnit = waitTimeUnit; + } + + /** * Update the counter value and wake up all threads waiting for this * value or any value below it. * <p> @@ -149,9 +181,10 @@ public final class CounterWait { * @param value requested counter value * @return current counter value that should be no smaller then the requested * value - * @throws InterruptedException if the wait was interrupted + * @throws InterruptedException if the wait was interrupted, TimeoutException if + * wait was not successfull within the timeout value specified at the construction time. */ - public long waitFor(long value) throws InterruptedException { + public long waitFor(long value) throws InterruptedException, TimeoutException { // Fast path - counter value already reached, no need to block if (value <= currentId.get()) { return currentId.get(); @@ -235,7 +268,7 @@ public final class CounterWait { * ValueEvents are stored in priority queue sorted by value, so they should be * comparable by the value. */ - private static class ValueEvent implements Comparable<ValueEvent> { + private class ValueEvent implements Comparable<ValueEvent> { /** Value waited for. */ private final long value; /** Binary semaphore to synchronize waiters */ @@ -254,11 +287,17 @@ public final class CounterWait { } /** Wait until signaled or interrupted. May return immediately if already signalled. */ - void waitFor() throws InterruptedException { - semaphore.acquire(); + void waitFor() throws InterruptedException, TimeoutException { + if (waitTimeout == 0) { + semaphore.acquire(); + return; + } + if (!semaphore.tryAcquire(waitTimeout, waitTimeUnit)) { + throw new TimeoutException(); + } } - /** @return the value we are waiting for */ + /** @return the value we are waiting for. */ long getValue() { return value; } @@ -269,7 +308,7 @@ public final class CounterWait { } /** - * Compare objects by value + * Compare objects by value. */ @Override public int compareTo(final ValueEvent o) { @@ -279,7 +318,7 @@ public final class CounterWait { } /** - * Use identity comparison of objects + * Use identity comparison of objects. */ @Override public boolean equals(final Object o) { http://git-wip-us.apache.org/repos/asf/sentry/blob/b3826776/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java index 48aec1e..280aebc 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java @@ -139,6 +139,12 @@ public class ServiceConstants { public static final String SERVER_HA_ZOOKEEPER_CLIENT_TICKET_CACHE = "sentry.zookeeper.client.ticketcache"; public static final String SERVER_HA_ZOOKEEPER_CLIENT_TICKET_CACHE_DEFAULT = "false"; public static final String SERVER_HA_STANDBY_SIG = "sentry.ha.standby.signal"; + + // Timeout value in seconds for HMS notificationID synchronization + // Should match the value for RPC timeout in HMS client config + public static final String SENTRY_NOTIFICATION_SYNC_TIMEOUT_MS = "sentry.notification.sync.timeout.ms"; + public static final int SENTRY_NOTIFICATION_SYNC_TIMEOUT_DEFAULT = 200000; + public static final ImmutableMap<String, String> SENTRY_STORE_DEFAULTS = ImmutableMap.<String, String>builder() .put("datanucleus.connectionPoolingType", "BoneCP") http://git-wip-us.apache.org/repos/asf/sentry/blob/b3826776/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java index e4846d9..090999a 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java @@ -25,6 +25,8 @@ import java.util.concurrent.BlockingDeque; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.junit.Test; @@ -56,7 +58,7 @@ public class TestCounterWait { long r = 0; try { r = waiter.waitFor(val); // blocks - } catch (InterruptedException e) { + } catch (InterruptedException | TimeoutException e) { e.printStackTrace(); } outSyncQueue.add(r); // Once we wake up, post result @@ -89,6 +91,13 @@ public class TestCounterWait { executor.shutdown(); } + // Test for waitFor() timeout throwing TimeoutException + @Test(expected = TimeoutException.class) + public void testWaitForWithTimeout() throws Exception { + CounterWait waiter = new CounterWait(1, TimeUnit.MILLISECONDS); + waiter.waitFor(1); // Should throw exception + } + private void sleep(long ms) { try { Thread.sleep(ms);
