Repository: curator Updated Branches: refs/heads/CURATOR-397 e943763f0 -> fbf25adcf
Made the API of AsyncLocker cleaner Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/fbf25adc Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/fbf25adc Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/fbf25adc Branch: refs/heads/CURATOR-397 Commit: fbf25adcf88b4198fdecd18a494b615c70bf763c Parents: e943763 Author: randgalt <[email protected]> Authored: Sat Jun 10 22:22:08 2017 -0500 Committer: randgalt <[email protected]> Committed: Sat Jun 10 22:22:08 2017 -0500 ---------------------------------------------------------------------- .../org/apache/curator/x/async/AsyncLocker.java | 124 ++++++++----------- .../apache/curator/x/async/TestAsyncLocker.java | 17 ++- 2 files changed, 60 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/fbf25adc/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java index 8b65a4a..42f7041 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java @@ -34,22 +34,21 @@ import java.util.concurrent.TimeUnit; * Canonical usage: * <code><pre> * InterProcessMutex mutex = new InterProcessMutex(...) // or any InterProcessLock - * AsyncLocker.lockAsync(mutex).handle((state, e) -> { - * if ( e != null ) + * AsyncLocker.lockAsync(mutex).thenAccept(dummy -> { + * try * { - * // handle the error + * // do work while holding the lock * } - * else if ( state.hasTheLock() ) + * finally * { - * try - * { - * // do work while holding the lock - * } - * finally - * { - * state.release(); - * } + * AsyncLocker.release(mutex); * } + * }).exceptionally(e -> { + * if ( e instanceOf TimeoutException ) { + * // timed out trying to acquire the lock + * } + * // handle the error + * return null; * }); * </pre></code> * </p> @@ -57,23 +56,11 @@ import java.util.concurrent.TimeUnit; public class AsyncLocker { /** - * State of the lock + * Set as the completion stage's exception when trying to acquire a lock + * times out */ - public interface LockState + public static class TimeoutException extends RuntimeException { - /** - * Returns true if you own the lock - * - * @return true/false - */ - boolean hasTheLock(); - - /** - * Safe release of the lock. Only tries to release - * if you own the lock. The lock ownership is changed - * to <code>false</code> by this method. - */ - void release(); } /** @@ -86,13 +73,18 @@ public class AsyncLocker * @param executor executor to use to asynchronously acquire * @return stage */ - public static CompletionStage<LockState> lockAsync(InterProcessLock lock, long timeout, TimeUnit unit, Executor executor) + public static CompletionStage<Void> lockAsync(InterProcessLock lock, long timeout, TimeUnit unit, Executor executor) { + CompletableFuture<Void> future = new CompletableFuture<>(); if ( executor == null ) { - return CompletableFuture.supplyAsync(() -> lock(lock, timeout, unit)); + CompletableFuture.runAsync(() -> lock(future, lock, timeout, unit)); } - return CompletableFuture.supplyAsync(() -> lock(lock, timeout, unit), executor); + else + { + CompletableFuture.runAsync(() -> lock(future, lock, timeout, unit), executor); + } + return future; } /** @@ -103,7 +95,7 @@ public class AsyncLocker * @param executor executor to use to asynchronously acquire * @return stage */ - public static CompletionStage<LockState> lockAsync(InterProcessLock lock, Executor executor) + public static CompletionStage<Void> lockAsync(InterProcessLock lock, Executor executor) { return lockAsync(lock, 0, null, executor); } @@ -117,7 +109,7 @@ public class AsyncLocker * @param unit time unit of timeout * @return stage */ - public static CompletionStage<LockState> lockAsync(InterProcessLock lock, long timeout, TimeUnit unit) + public static CompletionStage<Void> lockAsync(InterProcessLock lock, long timeout, TimeUnit unit) { return lockAsync(lock, timeout, unit, null); } @@ -129,23 +121,21 @@ public class AsyncLocker * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.) * @return stage */ - public static CompletionStage<LockState> lockAsync(InterProcessLock lock) + public static CompletionStage<Void> lockAsync(InterProcessLock lock) { return lockAsync(lock, 0, null, null); } - private static LockState lock(InterProcessLock lock, long timeout, TimeUnit unit) + /** + * Release the lock and wrap any exception in <code>RuntimeException</code> + * + * @param lock lock to release + */ + public static void release(InterProcessLock lock) { try { - if ( unit != null ) - { - boolean hasTheLock = lock.acquire(timeout, unit); - return new InternalLockState(lock, hasTheLock); - } - - lock.acquire(); - return new InternalLockState(lock, true); + lock.release(); } catch ( Exception e ) { @@ -154,43 +144,35 @@ public class AsyncLocker } } - private AsyncLocker() - { - } - - private static class InternalLockState implements LockState + private static void lock(CompletableFuture<Void> future, InterProcessLock lock, long timeout, TimeUnit unit) { - private final InterProcessLock lock; - private volatile boolean hasTheLock; - - public InternalLockState(InterProcessLock lock, boolean hasTheLock) - { - this.lock = lock; - this.hasTheLock = hasTheLock; - } - - @Override - public boolean hasTheLock() - { - return hasTheLock; - } - - @Override - public void release() + try { - if ( hasTheLock ) + if ( unit != null ) { - hasTheLock = false; - try + if ( lock.acquire(timeout, unit) ) { - lock.release(); + future.complete(null); } - catch ( Exception e ) + else { - ThreadUtils.checkInterrupted(e); - throw new RuntimeException(e); + future.completeExceptionally(new TimeoutException()); } } + else + { + lock.acquire(); + future.complete(null); + } } + catch ( Exception e ) + { + ThreadUtils.checkInterrupted(e); + future.completeExceptionally(e); + } + } + + private AsyncLocker() + { } } http://git-wip-us.apache.org/repos/asf/curator/blob/fbf25adc/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java b/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java index 7ea2d08..2553620 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java @@ -37,10 +37,9 @@ public class TestAsyncLocker extends CompletableBaseClassForTests client.start(); InterProcessMutex lock = new InterProcessMutex(client, "/one/two"); - complete(AsyncLocker.lockAsync(lock), (state, e) -> { + complete(AsyncLocker.lockAsync(lock), (__, e) -> { Assert.assertNull(e); - Assert.assertTrue(state.hasTheLock()); - state.release(); + AsyncLocker.release(lock); }); } } @@ -55,20 +54,18 @@ public class TestAsyncLocker extends CompletableBaseClassForTests InterProcessMutex lock1 = new InterProcessMutex(client, "/one/two"); InterProcessMutex lock2 = new InterProcessMutex(client, "/one/two"); CountDownLatch latch = new CountDownLatch(1); - AsyncLocker.lockAsync(lock1).thenAccept(state -> { - if ( state.hasTheLock() ) - { - latch.countDown(); // don't release the lock - } + AsyncLocker.lockAsync(lock1).thenAccept(__ -> { + latch.countDown(); // don't release the lock }); Assert.assertTrue(timing.awaitLatch(latch)); CountDownLatch latch2 = new CountDownLatch(1); - AsyncLocker.lockAsync(lock2, timing.forSleepingABit().milliseconds(), TimeUnit.MILLISECONDS).thenAccept(state -> { - if ( !state.hasTheLock() ) + AsyncLocker.lockAsync(lock2, timing.forSleepingABit().milliseconds(), TimeUnit.MILLISECONDS).exceptionally(e -> { + if ( e instanceof AsyncLocker.TimeoutException ) { latch2.countDown(); // lock should still be held } + return null; }); Assert.assertTrue(timing.awaitLatch(latch2)); }
