Repository: sentry Updated Branches: refs/heads/sentry-ha-redesign 5a8e38a86 -> 80070f277
SENTRY-1695: Waiting for HMS notifications from Thrift should be interruptible (Alex Kolbasov, reviewed by: Hao Hao and Sergio Pena) Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/80070f27 Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/80070f27 Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/80070f27 Branch: refs/heads/sentry-ha-redesign Commit: 80070f2775652b2d3dd9a7e060519a96ef093b01 Parents: 5a8e38a Author: Alexander Kolbasov <[email protected]> Authored: Wed May 3 12:43:47 2017 -0700 Committer: Alexander Kolbasov <[email protected]> Committed: Wed May 3 12:44:06 2017 -0700 ---------------------------------------------------------------------- .../service/thrift/SentryPolicyStoreProcessor.java | 9 +++++++-- .../apache/sentry/service/thrift/CounterWait.java | 9 +++++---- .../sentry/service/thrift/TestCounterWait.java | 16 ++++++++++------ 3 files changed, 22 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/80070f27/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java index d38b1eb..ad23334 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java @@ -1149,12 +1149,17 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface { @Override public TSentrySyncIDResponse sentry_sync_notifications(TSentrySyncIDRequest request) throws TException { + TSentrySyncIDResponse response = new TSentrySyncIDResponse(); try (Timer.Context timerContext = hmsWaitTimer.time()) { // Wait until Sentry Server processes specified HMS Notification ID. - TSentrySyncIDResponse response = new TSentrySyncIDResponse(); response.setId(sentryStore.getCounterWait().waitFor(request.getId())); response.setStatus(Status.OK()); - return response; + } catch (InterruptedException e) { + String msg = String.format("wait request for id %d is interrupted", + request.getId()); + LOGGER.error(msg, e); + response.setStatus(Status.RuntimeError(msg, e)); } + return response; } } http://git-wip-us.apache.org/repos/asf/sentry/blob/80070f27/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java index f593bff..2b4ee84 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java @@ -117,8 +117,9 @@ public final class CounterWait { * @param value requested counter value * @return current counter value that should be no smaller then the requested * value + * @throws InterruptedException if the wait was interrupted */ - public long waitFor(long value) { + public long waitFor(long value) throws InterruptedException { // Fast path - counter value already reached, no need to block if (value <= currentId.get()) { return currentId.get(); @@ -220,9 +221,9 @@ public final class CounterWait { semaphore.acquireUninterruptibly(); // Will not block } - /** Wait until signaled. May return immediately if already signalled. */ - void waitFor() { - semaphore.acquireUninterruptibly(); + /** Wait until signaled or interrupted. May return immediately if already signalled. */ + void waitFor() throws InterruptedException { + semaphore.acquire(); } /** @return the value we are waiting for */ http://git-wip-us.apache.org/repos/asf/sentry/blob/80070f27/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java index a700178..1b732da 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java @@ -30,12 +30,11 @@ import java.util.concurrent.LinkedBlockingDeque; */ public class TestCounterWait extends TestCase { // Used to verify that wakeups happen in the right order - private BlockingDeque<Long> outSyncQueue = new LinkedBlockingDeque<>(); - // Number of waiters to test for - private int nthreads = 20; + private final BlockingDeque<Long> outSyncQueue = new LinkedBlockingDeque<>(); public void testWaitFor() throws Exception { // Create a thread for each waiter + int nthreads = 20; ExecutorService executor = Executors.newFixedThreadPool(nthreads); final CounterWait waiter = new CounterWait(); @@ -46,11 +45,16 @@ public class TestCounterWait extends TestCase { // Create a pair of threads waiting for each value in [1, nthreads / 2] // We use pair of threads per value to verify that both are waken up for (int i = 0; i < nthreads; i++) { - final int finalI = i + 2; + int finalI = i + 2; final int val = finalI / 2; executor.execute(new Runnable() { public void run() { - long r = waiter.waitFor(val); // blocks + long r = 0; + try { + r = waiter.waitFor(val); // blocks + } catch (InterruptedException e) { + e.printStackTrace(); + } outSyncQueue.add(r); // Once we wake up, post result } } @@ -68,7 +72,7 @@ public class TestCounterWait extends TestCase { // Post a counter update for each value in [ 1, nthreads / 2 ] // After eac update two threads should be waken up and the corresponding pair of // values should appear in the outSyncQueue. - for (int i = 0; i < nthreads / 2; i++) { + for (int i = 0; i < (nthreads / 2); i++) { waiter.update(i + 1); long r = outSyncQueue.takeFirst(); assertEquals(r, i + 1);
