Repository: curator Updated Branches: refs/heads/CURATOR-397 f7d410f8e -> 09f9bc06a
Added asyncEnsureContainers Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/09f9bc06 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/09f9bc06 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/09f9bc06 Branch: refs/heads/CURATOR-397 Commit: 09f9bc06accf7c42d3a5dcf3d92c20d706e56ea6 Parents: f7d410f Author: randgalt <[email protected]> Authored: Wed Jun 28 09:46:32 2017 -0500 Committer: randgalt <[email protected]> Committed: Wed Jun 28 09:46:32 2017 -0500 ---------------------------------------------------------------------- .../org/apache/curator/x/async/AsyncLocker.java | 251 ---------------- .../apache/curator/x/async/AsyncWrappers.java | 296 +++++++++++++++++++ .../apache/curator/x/async/TestAsyncLocker.java | 73 ----- .../curator/x/async/TestAsyncWrappers.java | 73 +++++ 4 files changed, 369 insertions(+), 324 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/09f9bc06/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java deleted file mode 100644 index b15fd4b..0000000 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java +++ /dev/null @@ -1,251 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.x.async; - -import org.apache.curator.framework.recipes.locks.InterProcessLock; -import org.apache.curator.utils.ThreadUtils; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; - -/** - * <p> - * Utility for acquiring a lock asynchronously - * </p> - * - * <p> - * Canonical usage: - * <code><pre> - * InterProcessMutex mutex = new InterProcessMutex(...) // or any InterProcessLock - * AsyncLocker.lockAsync(mutex).thenAccept(dummy -> { - * try - * { - * // do work while holding the lock - * } - * finally - * { - * AsyncLocker.release(mutex); - * } - * }).exceptionally(e -> { - * if ( e instanceOf TimeoutException ) { - * // timed out trying to acquire the lock - * } - * // handle the error - * return null; - * }); - * </pre></code> - * </p> - */ -public class AsyncLocker -{ - /** - * Set as the completion stage's exception when trying to acquire a lock - * times out - */ - public static class TimeoutException extends RuntimeException - { - } - - /** - * Attempt to acquire the given lock asynchronously using the given timeout and executor. If the lock - * is not acquired within the timeout stage is completedExceptionally with {@link org.apache.curator.x.async.AsyncLocker.TimeoutException} - * - * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex}, - * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.) - * @param timeout max timeout to acquire lock - * @param unit time unit of timeout - * @param executor executor to use to asynchronously acquire - * @return stage - */ - public static CompletionStage<Void> lockAsync(InterProcessLock lock, long timeout, TimeUnit unit, Executor executor) - { - CompletableFuture<Void> future = new CompletableFuture<>(); - if ( executor == null ) - { - CompletableFuture.runAsync(() -> lock(future, lock, timeout, unit)); - } - else - { - CompletableFuture.runAsync(() -> lock(future, lock, timeout, unit), executor); - } - return future; - } - - /** - * Attempt to acquire the given lock asynchronously using the given timeout and executor. The stage - * is completed with a Boolean that indicates whether or not the lock was acquired. - * - * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex}, - * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.) - * @param timeout max timeout to acquire lock - * @param unit time unit of timeout - * @param executor executor to use to asynchronously acquire - * @return stage - */ - public static CompletionStage<Boolean> lockAsyncIf(InterProcessLock lock, long timeout, TimeUnit unit, Executor executor) - { - CompletableFuture<Boolean> future = new CompletableFuture<>(); - if ( executor == null ) - { - CompletableFuture.runAsync(() -> lockIf(future, lock, timeout, unit)); - } - else - { - CompletableFuture.runAsync(() -> lockIf(future, lock, timeout, unit), executor); - } - return future; - } - - /** - * Attempt to acquire the given lock asynchronously using the given executor and without a timeout. - * - * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex}, - * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.) - * @param executor executor to use to asynchronously acquire - * @return stage - */ - public static CompletionStage<Void> lockAsync(InterProcessLock lock, Executor executor) - { - return lockAsync(lock, 0, null, executor); - } - - /** - * Attempt to acquire the given lock asynchronously using the given timeout using the {@link java.util.concurrent.ForkJoinPool#commonPool()}. - * If the lock is not acquired within the timeout stage is completedExceptionally with {@link org.apache.curator.x.async.AsyncLocker.TimeoutException} - * - * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex}, - * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.) - * @param timeout max timeout to acquire lock - * @param unit time unit of timeout - * @return stage - */ - public static CompletionStage<Void> lockAsync(InterProcessLock lock, long timeout, TimeUnit unit) - { - return lockAsync(lock, timeout, unit, null); - } - - /** - * Attempt to acquire the given lock asynchronously using the given timeout using the {@link java.util.concurrent.ForkJoinPool#commonPool()}. - * The stage is completed with a Boolean that indicates whether or not the lock was acquired. - * - * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex}, - * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.) - * @param timeout max timeout to acquire lock - * @param unit time unit of timeout - * @return stage - */ - public static CompletionStage<Boolean> lockAsyncIf(InterProcessLock lock, long timeout, TimeUnit unit) - { - return lockAsyncIf(lock, timeout, unit, null); - } - - /** - * Attempt to acquire the given lock asynchronously without timeout using the {@link java.util.concurrent.ForkJoinPool#commonPool()}. - * - * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex}, - * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.) - * @return stage - */ - public static CompletionStage<Void> lockAsync(InterProcessLock lock) - { - return lockAsync(lock, 0, null, null); - } - - /** - * Release the lock and wrap any exception in <code>RuntimeException</code> - * - * @param lock lock to release - */ - public static void release(InterProcessLock lock) - { - release(lock, true); - } - - /** - * Release the lock and wrap any exception in <code>RuntimeException</code> - * - * @param lock lock to release - * @param ignoreNoLockExceptions if true {@link java.lang.IllegalStateException} is ignored - */ - public static void release(InterProcessLock lock, boolean ignoreNoLockExceptions) - { - try - { - lock.release(); - } - catch ( IllegalStateException e ) - { - if ( !ignoreNoLockExceptions ) - { - throw new RuntimeException(e); - } - } - catch ( Exception e ) - { - ThreadUtils.checkInterrupted(e); - throw new RuntimeException(e); - } - } - - private static void lockIf(CompletableFuture<Boolean> future, InterProcessLock lock, long timeout, TimeUnit unit) - { - try - { - future.complete(lock.acquire(timeout, unit)); - } - catch ( Exception e ) - { - ThreadUtils.checkInterrupted(e); - future.completeExceptionally(e); - } - } - - private static void lock(CompletableFuture<Void> future, InterProcessLock lock, long timeout, TimeUnit unit) - { - try - { - if ( unit != null ) - { - if ( lock.acquire(timeout, unit) ) - { - future.complete(null); - } - else - { - future.completeExceptionally(new TimeoutException()); - } - } - else - { - lock.acquire(); - future.complete(null); - } - } - catch ( Exception e ) - { - ThreadUtils.checkInterrupted(e); - future.completeExceptionally(e); - } - } - - private AsyncLocker() - { - } -} http://git-wip-us.apache.org/repos/asf/curator/blob/09f9bc06/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java new file mode 100644 index 0000000..8ff507c --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java @@ -0,0 +1,296 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async; + +import org.apache.curator.framework.EnsureContainers; +import org.apache.curator.framework.recipes.locks.InterProcessLock; +import org.apache.curator.utils.ThreadUtils; +import org.apache.curator.x.async.modeled.ZPath; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +/** + * <p> + * Utility for adding asynchronous behavior + * </p> + * + * <p> + * E.g. locks: + * <code><pre> + * InterProcessMutex mutex = new InterProcessMutex(...) // or any InterProcessLock + * AsyncWrappers.lockAsync(mutex, executor).thenAccept(dummy -> { + * try + * { + * // do work while holding the lock + * } + * finally + * { + * AsyncWrappers.release(mutex); + * } + * }).exceptionally(e -> { + * if ( e instanceOf TimeoutException ) { + * // timed out trying to acquire the lock + * } + * // handle the error + * return null; + * }); + * </pre></code> + * </p> + * + * <p> + * E.g. EnsureContainers + * <code><pre> + * AsyncWrappers.(client, path, executor).thenAccept(dummy -> { + * // execute after ensuring containers + * }); + * </pre></code> + * </p> + */ +public class AsyncWrappers +{ + /** + * Asynchronously call {@link org.apache.curator.framework.EnsureContainers} using the {@link java.util.concurrent.ForkJoinPool#commonPool()}. + * + * @param client client + * @param path path to ensure + * @return stage + */ + public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, ZPath path) + { + return asyncEnsureContainers(client, path, null); + } + + /** + * Asynchronously call {@link org.apache.curator.framework.EnsureContainers} using the given executor + * + * @param client client + * @param path path to ensure + * @return stage + */ + public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, ZPath path, Executor executor) + { + Runnable proc = () -> { + try + { + new EnsureContainers(client.unwrap(), path.fullPath()).ensure(); + } + catch ( Exception e ) + { + throw new RuntimeException(e); + } + }; + return (executor != null) ? CompletableFuture.runAsync(proc, executor) : CompletableFuture.runAsync(proc); + } + + /** + * Set as the completion stage's exception when trying to acquire a lock + * times out + */ + public static class TimeoutException extends RuntimeException + { + } + + /** + * Attempt to acquire the given lock asynchronously using the given timeout and executor. If the lock + * is not acquired within the timeout stage is completedExceptionally with {@link AsyncWrappers.TimeoutException} + * + * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex}, + * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.) + * @param timeout max timeout to acquire lock + * @param unit time unit of timeout + * @param executor executor to use to asynchronously acquire + * @return stage + */ + public static CompletionStage<Void> lockAsync(InterProcessLock lock, long timeout, TimeUnit unit, Executor executor) + { + CompletableFuture<Void> future = new CompletableFuture<>(); + if ( executor == null ) + { + CompletableFuture.runAsync(() -> lock(future, lock, timeout, unit)); + } + else + { + CompletableFuture.runAsync(() -> lock(future, lock, timeout, unit), executor); + } + return future; + } + + /** + * Attempt to acquire the given lock asynchronously using the given timeout and executor. The stage + * is completed with a Boolean that indicates whether or not the lock was acquired. + * + * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex}, + * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.) + * @param timeout max timeout to acquire lock + * @param unit time unit of timeout + * @param executor executor to use to asynchronously acquire + * @return stage + */ + public static CompletionStage<Boolean> lockAsyncIf(InterProcessLock lock, long timeout, TimeUnit unit, Executor executor) + { + CompletableFuture<Boolean> future = new CompletableFuture<>(); + if ( executor == null ) + { + CompletableFuture.runAsync(() -> lockIf(future, lock, timeout, unit)); + } + else + { + CompletableFuture.runAsync(() -> lockIf(future, lock, timeout, unit), executor); + } + return future; + } + + /** + * Attempt to acquire the given lock asynchronously using the given executor and without a timeout. + * + * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex}, + * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.) + * @param executor executor to use to asynchronously acquire + * @return stage + */ + public static CompletionStage<Void> lockAsync(InterProcessLock lock, Executor executor) + { + return lockAsync(lock, 0, null, executor); + } + + /** + * Attempt to acquire the given lock asynchronously using the given timeout using the {@link java.util.concurrent.ForkJoinPool#commonPool()}. + * If the lock is not acquired within the timeout stage is completedExceptionally with {@link AsyncWrappers.TimeoutException} + * + * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex}, + * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.) + * @param timeout max timeout to acquire lock + * @param unit time unit of timeout + * @return stage + */ + public static CompletionStage<Void> lockAsync(InterProcessLock lock, long timeout, TimeUnit unit) + { + return lockAsync(lock, timeout, unit, null); + } + + /** + * Attempt to acquire the given lock asynchronously using the given timeout using the {@link java.util.concurrent.ForkJoinPool#commonPool()}. + * The stage is completed with a Boolean that indicates whether or not the lock was acquired. + * + * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex}, + * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.) + * @param timeout max timeout to acquire lock + * @param unit time unit of timeout + * @return stage + */ + public static CompletionStage<Boolean> lockAsyncIf(InterProcessLock lock, long timeout, TimeUnit unit) + { + return lockAsyncIf(lock, timeout, unit, null); + } + + /** + * Attempt to acquire the given lock asynchronously without timeout using the {@link java.util.concurrent.ForkJoinPool#commonPool()}. + * + * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex}, + * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.) + * @return stage + */ + public static CompletionStage<Void> lockAsync(InterProcessLock lock) + { + return lockAsync(lock, 0, null, null); + } + + /** + * Release the lock and wrap any exception in <code>RuntimeException</code> + * + * @param lock lock to release + */ + public static void release(InterProcessLock lock) + { + release(lock, true); + } + + /** + * Release the lock and wrap any exception in <code>RuntimeException</code> + * + * @param lock lock to release + * @param ignoreNoLockExceptions if true {@link java.lang.IllegalStateException} is ignored + */ + public static void release(InterProcessLock lock, boolean ignoreNoLockExceptions) + { + try + { + lock.release(); + } + catch ( IllegalStateException e ) + { + if ( !ignoreNoLockExceptions ) + { + throw new RuntimeException(e); + } + } + catch ( Exception e ) + { + ThreadUtils.checkInterrupted(e); + throw new RuntimeException(e); + } + } + + private static void lockIf(CompletableFuture<Boolean> future, InterProcessLock lock, long timeout, TimeUnit unit) + { + try + { + future.complete(lock.acquire(timeout, unit)); + } + catch ( Exception e ) + { + ThreadUtils.checkInterrupted(e); + future.completeExceptionally(e); + } + } + + private static void lock(CompletableFuture<Void> future, InterProcessLock lock, long timeout, TimeUnit unit) + { + try + { + if ( unit != null ) + { + if ( lock.acquire(timeout, unit) ) + { + future.complete(null); + } + else + { + future.completeExceptionally(new TimeoutException()); + } + } + else + { + lock.acquire(); + future.complete(null); + } + } + catch ( Exception e ) + { + ThreadUtils.checkInterrupted(e); + future.completeExceptionally(e); + } + } + + private AsyncWrappers() + { + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/09f9bc06/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java b/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java deleted file mode 100644 index 2553620..0000000 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.x.async; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.recipes.locks.InterProcessMutex; -import org.apache.curator.retry.RetryOneTime; -import org.testng.Assert; -import org.testng.annotations.Test; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -public class TestAsyncLocker extends CompletableBaseClassForTests -{ - @Test - public void testBasic() - { - try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) ) - { - client.start(); - - InterProcessMutex lock = new InterProcessMutex(client, "/one/two"); - complete(AsyncLocker.lockAsync(lock), (__, e) -> { - Assert.assertNull(e); - AsyncLocker.release(lock); - }); - } - } - - @Test - public void testContention() throws Exception - { - try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) ) - { - client.start(); - - InterProcessMutex lock1 = new InterProcessMutex(client, "/one/two"); - InterProcessMutex lock2 = new InterProcessMutex(client, "/one/two"); - CountDownLatch latch = new CountDownLatch(1); - AsyncLocker.lockAsync(lock1).thenAccept(__ -> { - latch.countDown(); // don't release the lock - }); - Assert.assertTrue(timing.awaitLatch(latch)); - - CountDownLatch latch2 = new CountDownLatch(1); - AsyncLocker.lockAsync(lock2, timing.forSleepingABit().milliseconds(), TimeUnit.MILLISECONDS).exceptionally(e -> { - if ( e instanceof AsyncLocker.TimeoutException ) - { - latch2.countDown(); // lock should still be held - } - return null; - }); - Assert.assertTrue(timing.awaitLatch(latch2)); - } - } -} http://git-wip-us.apache.org/repos/asf/curator/blob/09f9bc06/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncWrappers.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncWrappers.java b/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncWrappers.java new file mode 100644 index 0000000..7ce7904 --- /dev/null +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncWrappers.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.curator.retry.RetryOneTime; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class TestAsyncWrappers extends CompletableBaseClassForTests +{ + @Test + public void testBasic() + { + try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) ) + { + client.start(); + + InterProcessMutex lock = new InterProcessMutex(client, "/one/two"); + complete(AsyncWrappers.lockAsync(lock), (__, e) -> { + Assert.assertNull(e); + AsyncWrappers.release(lock); + }); + } + } + + @Test + public void testContention() throws Exception + { + try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) ) + { + client.start(); + + InterProcessMutex lock1 = new InterProcessMutex(client, "/one/two"); + InterProcessMutex lock2 = new InterProcessMutex(client, "/one/two"); + CountDownLatch latch = new CountDownLatch(1); + AsyncWrappers.lockAsync(lock1).thenAccept(__ -> { + latch.countDown(); // don't release the lock + }); + Assert.assertTrue(timing.awaitLatch(latch)); + + CountDownLatch latch2 = new CountDownLatch(1); + AsyncWrappers.lockAsync(lock2, timing.forSleepingABit().milliseconds(), TimeUnit.MILLISECONDS).exceptionally(e -> { + if ( e instanceof AsyncWrappers.TimeoutException ) + { + latch2.countDown(); // lock should still be held + } + return null; + }); + Assert.assertTrue(timing.awaitLatch(latch2)); + } + } +}
