Repository: curator Updated Branches: refs/heads/CURATOR-397 93f11ed09 -> e943763f0
Added AsyncLocker Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/e943763f Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/e943763f Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/e943763f Branch: refs/heads/CURATOR-397 Commit: e943763f0d299b5e69e6a7f4e871024b7aa95503 Parents: 93f11ed Author: randgalt <[email protected]> Authored: Sat Jun 10 20:46:19 2017 -0500 Committer: randgalt <[email protected]> Committed: Sat Jun 10 20:46:19 2017 -0500 ---------------------------------------------------------------------- .../org/apache/curator/x/async/AsyncLocker.java | 196 +++++++++++++++++++ .../apache/curator/x/async/TestAsyncLocker.java | 76 +++++++ 2 files changed, 272 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/e943763f/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java new file mode 100644 index 0000000..8b65a4a --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async; + +import org.apache.curator.framework.recipes.locks.InterProcessLock; +import org.apache.curator.utils.ThreadUtils; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +/** + * <p> + * Utility for acquiring a lock asynchronously + * </p> + * + * <p> + * Canonical usage: + * <code><pre> + * InterProcessMutex mutex = new InterProcessMutex(...) // or any InterProcessLock + * AsyncLocker.lockAsync(mutex).handle((state, e) -> { + * if ( e != null ) + * { + * // handle the error + * } + * else if ( state.hasTheLock() ) + * { + * try + * { + * // do work while holding the lock + * } + * finally + * { + * state.release(); + * } + * } + * }); + * </pre></code> + * </p> + */ +public class AsyncLocker +{ + /** + * State of the lock + */ + public interface LockState + { + /** + * Returns true if you own the lock + * + * @return true/false + */ + boolean hasTheLock(); + + /** + * Safe release of the lock. Only tries to release + * if you own the lock. The lock ownership is changed + * to <code>false</code> by this method. + */ + void release(); + } + + /** + * Attempt to acquire the given lock asynchronously using the given timeout and executor. + * + * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex}, + * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.) + * @param timeout max timeout to acquire lock + * @param unit time unit of timeout + * @param executor executor to use to asynchronously acquire + * @return stage + */ + public static CompletionStage<LockState> lockAsync(InterProcessLock lock, long timeout, TimeUnit unit, Executor executor) + { + if ( executor == null ) + { + return CompletableFuture.supplyAsync(() -> lock(lock, timeout, unit)); + } + return CompletableFuture.supplyAsync(() -> lock(lock, timeout, unit), executor); + } + + /** + * Attempt to acquire the given lock asynchronously using the given executor and without a timeout. + * + * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex}, + * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.) + * @param executor executor to use to asynchronously acquire + * @return stage + */ + public static CompletionStage<LockState> lockAsync(InterProcessLock lock, Executor executor) + { + return lockAsync(lock, 0, null, executor); + } + + /** + * Attempt to acquire the given lock asynchronously using the given timeout using the {@link java.util.concurrent.ForkJoinPool#commonPool()}. + * + * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex}, + * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.) + * @param timeout max timeout to acquire lock + * @param unit time unit of timeout + * @return stage + */ + public static CompletionStage<LockState> lockAsync(InterProcessLock lock, long timeout, TimeUnit unit) + { + return lockAsync(lock, timeout, unit, null); + } + + /** + * Attempt to acquire the given lock asynchronously without timeout using the {@link java.util.concurrent.ForkJoinPool#commonPool()}. + * + * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex}, + * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.) + * @return stage + */ + public static CompletionStage<LockState> lockAsync(InterProcessLock lock) + { + return lockAsync(lock, 0, null, null); + } + + private static LockState lock(InterProcessLock lock, long timeout, TimeUnit unit) + { + try + { + if ( unit != null ) + { + boolean hasTheLock = lock.acquire(timeout, unit); + return new InternalLockState(lock, hasTheLock); + } + + lock.acquire(); + return new InternalLockState(lock, true); + } + catch ( Exception e ) + { + ThreadUtils.checkInterrupted(e); + throw new RuntimeException(e); + } + } + + private AsyncLocker() + { + } + + private static class InternalLockState implements LockState + { + private final InterProcessLock lock; + private volatile boolean hasTheLock; + + public InternalLockState(InterProcessLock lock, boolean hasTheLock) + { + this.lock = lock; + this.hasTheLock = hasTheLock; + } + + @Override + public boolean hasTheLock() + { + return hasTheLock; + } + + @Override + public void release() + { + if ( hasTheLock ) + { + hasTheLock = false; + try + { + lock.release(); + } + catch ( Exception e ) + { + ThreadUtils.checkInterrupted(e); + throw new RuntimeException(e); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/e943763f/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java b/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java new file mode 100644 index 0000000..7ea2d08 --- /dev/null +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.curator.retry.RetryOneTime; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class TestAsyncLocker extends CompletableBaseClassForTests +{ + @Test + public void testBasic() + { + try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) ) + { + client.start(); + + InterProcessMutex lock = new InterProcessMutex(client, "/one/two"); + complete(AsyncLocker.lockAsync(lock), (state, e) -> { + Assert.assertNull(e); + Assert.assertTrue(state.hasTheLock()); + state.release(); + }); + } + } + + @Test + public void testContention() throws Exception + { + try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) ) + { + client.start(); + + InterProcessMutex lock1 = new InterProcessMutex(client, "/one/two"); + InterProcessMutex lock2 = new InterProcessMutex(client, "/one/two"); + CountDownLatch latch = new CountDownLatch(1); + AsyncLocker.lockAsync(lock1).thenAccept(state -> { + if ( state.hasTheLock() ) + { + latch.countDown(); // don't release the lock + } + }); + Assert.assertTrue(timing.awaitLatch(latch)); + + CountDownLatch latch2 = new CountDownLatch(1); + AsyncLocker.lockAsync(lock2, timing.forSleepingABit().milliseconds(), TimeUnit.MILLISECONDS).thenAccept(state -> { + if ( !state.hasTheLock() ) + { + latch2.countDown(); // lock should still be held + } + }); + Assert.assertTrue(timing.awaitLatch(latch2)); + } + } +}
