Repository: curator Updated Branches: refs/heads/CURATOR-397 fbf25adcf -> f0bcd0476
Added lockAsyncIf to AsyncLock Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f0bcd047 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f0bcd047 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f0bcd047 Branch: refs/heads/CURATOR-397 Commit: f0bcd0476e30a876f7995d191d8811fadb1a201e Parents: fbf25ad Author: randgalt <[email protected]> Authored: Tue Jun 13 07:34:25 2017 -0500 Committer: randgalt <[email protected]> Committed: Tue Jun 13 07:34:25 2017 -0500 ---------------------------------------------------------------------- .../org/apache/curator/x/async/AsyncLocker.java | 57 +++++++++++++++++++- 1 file changed, 56 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/f0bcd047/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java index 42f7041..e04c11d 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java @@ -64,7 +64,8 @@ public class AsyncLocker } /** - * Attempt to acquire the given lock asynchronously using the given timeout and executor. + * Attempt to acquire the given lock asynchronously using the given timeout and executor. If the lock + * is not acquired within the timeout stage is completedExceptionally with {@link org.apache.curator.x.async.AsyncLocker.TimeoutException} * * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex}, * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.) @@ -88,6 +89,31 @@ public class AsyncLocker } /** + * Attempt to acquire the given lock asynchronously using the given timeout and executor. The stage + * is completed with a Boolean that indicates whether or not the lock was acquired. + * + * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex}, + * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.) + * @param timeout max timeout to acquire lock + * @param unit time unit of timeout + * @param executor executor to use to asynchronously acquire + * @return stage + */ + public static CompletionStage<Boolean> lockAsyncIf(InterProcessLock lock, long timeout, TimeUnit unit, Executor executor) + { + CompletableFuture<Boolean> future = new CompletableFuture<>(); + if ( executor == null ) + { + CompletableFuture.runAsync(() -> lockIf(future, lock, timeout, unit)); + } + else + { + CompletableFuture.runAsync(() -> lockIf(future, lock, timeout, unit), executor); + } + return future; + } + + /** * Attempt to acquire the given lock asynchronously using the given executor and without a timeout. * * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex}, @@ -102,6 +128,7 @@ public class AsyncLocker /** * Attempt to acquire the given lock asynchronously using the given timeout using the {@link java.util.concurrent.ForkJoinPool#commonPool()}. + * If the lock is not acquired within the timeout stage is completedExceptionally with {@link org.apache.curator.x.async.AsyncLocker.TimeoutException} * * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex}, * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.) @@ -115,6 +142,21 @@ public class AsyncLocker } /** + * Attempt to acquire the given lock asynchronously using the given timeout using the {@link java.util.concurrent.ForkJoinPool#commonPool()}. + * The stage is completed with a Boolean that indicates whether or not the lock was acquired. + * + * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex}, + * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.) + * @param timeout max timeout to acquire lock + * @param unit time unit of timeout + * @return stage + */ + public static CompletionStage<Boolean> lockAsyncIf(InterProcessLock lock, long timeout, TimeUnit unit) + { + return lockAsyncIf(lock, timeout, unit, null); + } + + /** * Attempt to acquire the given lock asynchronously without timeout using the {@link java.util.concurrent.ForkJoinPool#commonPool()}. * * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex}, @@ -144,6 +186,19 @@ public class AsyncLocker } } + private static void lockIf(CompletableFuture<Boolean> future, InterProcessLock lock, long timeout, TimeUnit unit) + { + try + { + future.complete(lock.acquire(timeout, unit)); + } + catch ( Exception e ) + { + ThreadUtils.checkInterrupted(e); + future.completeExceptionally(e); + } + } + private static void lock(CompletableFuture<Void> future, InterProcessLock lock, long timeout, TimeUnit unit) { try
