SENTRY-1695: Waiting for HMS notifications from Thrift should be interruptible (Alex Kolbasov, reviewed by: Hao Hao and Sergio Pena)
Change-Id: I28e714b99ea08ea18fe1bde6fcb67b617ea3f563 Reviewed-on: http://gerrit.sjc.cloudera.com:8080/22325 Tested-by: Jenkins User Reviewed-by: Alexander Kolbasov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/ddfc9c8c Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/ddfc9c8c Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/ddfc9c8c Branch: refs/for/cdh5-1.5.1_ha Commit: ddfc9c8c64df1640d510d85dd20a1746f2a3553a Parents: ee48831 Author: Alexander Kolbasov <[email protected]> Authored: Thu May 4 16:51:51 2017 -0700 Committer: Alexander Kolbasov <[email protected]> Committed: Thu May 4 18:14:40 2017 -0700 ---------------------------------------------------------------------- .../service/thrift/SentryPolicyStoreProcessor.java | 9 +++++++-- .../apache/sentry/service/thrift/CounterWait.java | 9 +++++---- .../sentry/service/thrift/TestCounterWait.java | 16 ++++++++++------ 3 files changed, 22 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/ddfc9c8c/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java index b5aee50..56faf00 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java @@ -924,13 +924,18 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface { @Override public TSentrySyncIDResponse sentry_sync_notifications(TSentrySyncIDRequest request) throws TException { + TSentrySyncIDResponse response = new TSentrySyncIDResponse(); try (Timer.Context timerContext = hmsWaitTimer.time()) { // Wait until Sentry Server processes specified HMS Notification ID. - TSentrySyncIDResponse response = new TSentrySyncIDResponse(); response.setId(sentryStore.getCounterWait().waitFor(request.getId())); response.setStatus(Status.OK()); - return response; + } catch (InterruptedException e) { + String msg = String.format("wait request for id %d is interrupted", + request.getId()); + LOGGER.error(msg, e); + response.setStatus(Status.RuntimeError(msg, e)); } + return response; } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/sentry/blob/ddfc9c8c/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java index f593bff..2b4ee84 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java @@ -117,8 +117,9 @@ public final class CounterWait { * @param value requested counter value * @return current counter value that should be no smaller then the requested * value + * @throws InterruptedException if the wait was interrupted */ - public long waitFor(long value) { + public long waitFor(long value) throws InterruptedException { // Fast path - counter value already reached, no need to block if (value <= currentId.get()) { return currentId.get(); @@ -220,9 +221,9 @@ public final class CounterWait { semaphore.acquireUninterruptibly(); // Will not block } - /** Wait until signaled. May return immediately if already signalled. */ - void waitFor() { - semaphore.acquireUninterruptibly(); + /** Wait until signaled or interrupted. May return immediately if already signalled. */ + void waitFor() throws InterruptedException { + semaphore.acquire(); } /** @return the value we are waiting for */ http://git-wip-us.apache.org/repos/asf/sentry/blob/ddfc9c8c/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java index a700178..1b732da 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java @@ -30,12 +30,11 @@ import java.util.concurrent.LinkedBlockingDeque; */ public class TestCounterWait extends TestCase { // Used to verify that wakeups happen in the right order - private BlockingDeque<Long> outSyncQueue = new LinkedBlockingDeque<>(); - // Number of waiters to test for - private int nthreads = 20; + private final BlockingDeque<Long> outSyncQueue = new LinkedBlockingDeque<>(); public void testWaitFor() throws Exception { // Create a thread for each waiter + int nthreads = 20; ExecutorService executor = Executors.newFixedThreadPool(nthreads); final CounterWait waiter = new CounterWait(); @@ -46,11 +45,16 @@ public class TestCounterWait extends TestCase { // Create a pair of threads waiting for each value in [1, nthreads / 2] // We use pair of threads per value to verify that both are waken up for (int i = 0; i < nthreads; i++) { - final int finalI = i + 2; + int finalI = i + 2; final int val = finalI / 2; executor.execute(new Runnable() { public void run() { - long r = waiter.waitFor(val); // blocks + long r = 0; + try { + r = waiter.waitFor(val); // blocks + } catch (InterruptedException e) { + e.printStackTrace(); + } outSyncQueue.add(r); // Once we wake up, post result } } @@ -68,7 +72,7 @@ public class TestCounterWait extends TestCase { // Post a counter update for each value in [ 1, nthreads / 2 ] // After eac update two threads should be waken up and the corresponding pair of // values should appear in the outSyncQueue. - for (int i = 0; i < nthreads / 2; i++) { + for (int i = 0; i < (nthreads / 2); i++) { waiter.update(i + 1); long r = outSyncQueue.takeFirst(); assertEquals(r, i + 1);
