Added WatcherRemoveCuratorFramework to locks and updated tests to check for cleanliness
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f0a09db4 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f0a09db4 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f0a09db4 Branch: refs/heads/CURATOR-3.0 Commit: f0a09db4423f06455ed93c20778c65aaf7e8b06e Parents: 2c921d6 Author: randgalt <[email protected]> Authored: Tue May 19 22:42:14 2015 -0700 Committer: randgalt <[email protected]> Committed: Tue May 19 22:42:14 2015 -0700 ---------------------------------------------------------------------- .../framework/imps/WatcherRemovalFacade.java | 3 +- .../locks/InterProcessSemaphoreMutex.java | 6 +- .../recipes/locks/InterProcessSemaphoreV2.java | 54 +++-- .../framework/recipes/locks/LockInternals.java | 9 +- .../locks/TestInterProcessMultiMutex.java | 7 +- .../recipes/locks/TestInterProcessMutex.java | 5 +- .../locks/TestInterProcessMutexBase.java | 23 +- .../locks/TestInterProcessReadWriteLock.java | 223 +++++++++++-------- .../locks/TestInterProcessSemaphore.java | 27 ++- .../locks/TestInterProcessSemaphoreCluster.java | 3 +- .../framework/recipes/locks/TestLockACLs.java | 3 +- .../locks/TestLockCleanlinessWithFaults.java | 3 +- 12 files changed, 213 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java index eee423f..156341e 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.curator.framework.imps; import org.apache.curator.CuratorZookeeperClient; @@ -45,7 +46,7 @@ class WatcherRemovalFacade extends CuratorFrameworkImpl implements WatcherRemove @Override public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework() { - throw new UnsupportedOperationException(); + return client.newWatcherRemoveCuratorFramework(); } WatcherRemovalManager getRemovalManager() http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java index 88b5f5d..444b10d 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java @@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.locks; import com.google.common.base.Preconditions; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.WatcherRemoveCuratorFramework; import java.util.concurrent.TimeUnit; /** @@ -29,6 +30,7 @@ import java.util.concurrent.TimeUnit; public class InterProcessSemaphoreMutex implements InterProcessLock { private final InterProcessSemaphoreV2 semaphore; + private final WatcherRemoveCuratorFramework watcherRemoveClient; private volatile Lease lease; /** @@ -37,7 +39,8 @@ public class InterProcessSemaphoreMutex implements InterProcessLock */ public InterProcessSemaphoreMutex(CuratorFramework client, String path) { - this.semaphore = new InterProcessSemaphoreV2(client, path, 1); + watcherRemoveClient = client.newWatcherRemoveCuratorFramework(); + this.semaphore = new InterProcessSemaphoreV2(watcherRemoveClient, path, 1); } @Override @@ -66,6 +69,7 @@ public class InterProcessSemaphoreMutex implements InterProcessLock try { lease.close(); + watcherRemoveClient.removeWatchers(); } finally { http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java index 2e14ee1..2a55107 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java @@ -21,6 +21,7 @@ package org.apache.curator.framework.recipes.locks; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import org.apache.curator.framework.WatcherRemoveCuratorFramework; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.RetryLoop; import org.apache.curator.framework.CuratorFramework; @@ -75,7 +76,7 @@ public class InterProcessSemaphoreV2 { private final Logger log = LoggerFactory.getLogger(getClass()); private final InterProcessMutex lock; - private final CuratorFramework client; + private final WatcherRemoveCuratorFramework client; private final String leasesPath; private final Watcher watcher = new Watcher() { @@ -115,7 +116,7 @@ public class InterProcessSemaphoreV2 private InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases, SharedCountReader count) { - this.client = client; + this.client = client.newWatcherRemoveCuratorFramework(); path = PathUtils.validatePath(path); lock = new InterProcessMutex(client, ZKPaths.makePath(path, LOCK_PARENT)); this.maxLeases = (count != null) ? count.getCount() : maxLeases; @@ -345,36 +346,43 @@ public class InterProcessSemaphoreV2 String nodeName = ZKPaths.getNodeFromPath(path); builder.add(makeLease(path)); - synchronized(this) + try { - for(;;) + synchronized(this) { - List<String> children = client.getChildren().usingWatcher(watcher).forPath(leasesPath); - if ( !children.contains(nodeName) ) + for(;;) { - log.error("Sequential path not found: " + path); - return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE; - } + List<String> children = client.getChildren().usingWatcher(watcher).forPath(leasesPath); + if ( !children.contains(nodeName) ) + { + log.error("Sequential path not found: " + path); + return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE; + } - if ( children.size() <= maxLeases ) - { - break; - } - if ( hasWait ) - { - long thisWaitMs = getThisWaitMs(startMs, waitMs); - if ( thisWaitMs <= 0 ) + if ( children.size() <= maxLeases ) { - return InternalAcquireResult.RETURN_NULL; + break; + } + if ( hasWait ) + { + long thisWaitMs = getThisWaitMs(startMs, waitMs); + if ( thisWaitMs <= 0 ) + { + return InternalAcquireResult.RETURN_NULL; + } + wait(thisWaitMs); + } + else + { + wait(); } - wait(thisWaitMs); - } - else - { - wait(); } } } + finally + { + client.removeWatchers(); + } } finally { http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java index 2b4d3d9..4b0da11 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java @@ -24,11 +24,11 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.apache.curator.RetryLoop; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.WatcherRemoveCuratorFramework; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.utils.PathUtils; import org.apache.curator.utils.ZKPaths; -import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; @@ -42,7 +42,7 @@ import java.util.concurrent.atomic.AtomicReference; public class LockInternals { - private final CuratorFramework client; + private final WatcherRemoveCuratorFramework client; private final String path; private final String basePath; private final LockInternalsDriver driver; @@ -100,7 +100,7 @@ public class LockInternals this.lockName = lockName; this.maxLeases = maxLeases; - this.client = client; + this.client = client.newWatcherRemoveCuratorFramework(); this.basePath = PathUtils.validatePath(path); this.path = ZKPaths.makePath(path, lockName); } @@ -116,8 +116,9 @@ public class LockInternals revocable.set(entry); } - void releaseLock(String lockPath) throws Exception + final void releaseLock(String lockPath) throws Exception { + client.removeWatchers(); revocable.set(null); deleteOurPath(lockPath); } http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java index b1631a0..df6a2f5 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java @@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.locks; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.imps.TestCleanState; import org.apache.curator.retry.RetryOneTime; import org.testng.Assert; import org.testng.annotations.Test; @@ -84,13 +85,14 @@ public class TestInterProcessMultiMutex extends TestInterProcessMutexBase } catch ( Exception e ) { + // ignore } Assert.assertFalse(goodLock.isAcquiredInThisProcess()); Assert.assertTrue(otherGoodLock.isAcquiredInThisProcess()); } finally { - client.close(); + TestCleanState.closeAndTestClean(client); } } @@ -142,13 +144,14 @@ public class TestInterProcessMultiMutex extends TestInterProcessMutexBase } catch ( Exception e ) { + // ignore } Assert.assertFalse(goodLock.isAcquiredInThisProcess()); Assert.assertTrue(goodLockWasLocked.get()); } finally { - client.close(); + TestCleanState.closeAndTestClean(client); } } } http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java index 453de33..d6f8a1d 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java @@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.locks; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.imps.TestCleanState; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.KillSession; import org.apache.zookeeper.CreateMode; @@ -106,7 +107,7 @@ public class TestInterProcessMutex extends TestInterProcessMutexBase } finally { - client.close(); + TestCleanState.closeAndTestClean(client); } } @@ -151,7 +152,7 @@ public class TestInterProcessMutex extends TestInterProcessMutexBase } finally { - client.close(); + TestCleanState.closeAndTestClean(client); } } } http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java index 3fe8110..49e5d19 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java @@ -20,16 +20,14 @@ package org.apache.curator.framework.recipes.locks; import com.google.common.collect.Lists; -import org.apache.curator.test.BaseClassForTests; -import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.imps.TestCleanState; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.KillSession; -import org.apache.curator.test.TestingServer; import org.apache.curator.test.Timing; import org.testng.Assert; import org.testng.annotations.Test; @@ -123,7 +121,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests } finally { - CloseableUtils.closeQuietly(client); + TestCleanState.closeAndTestClean(client); } } @@ -199,7 +197,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests } finally { - client.close(); + TestCleanState.closeAndTestClean(client); } } @@ -265,7 +263,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests } finally { - client.close(); + TestCleanState.closeAndTestClean(client); } } @@ -311,7 +309,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests } finally { - client.close(); + TestCleanState.closeAndTestClean(client); } } @@ -328,7 +326,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests } finally { - client.close(); + TestCleanState.closeAndTestClean(client); } } @@ -460,11 +458,14 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests Assert.assertTrue(acquiredLatchForClient1.await(10, TimeUnit.SECONDS)); Assert.assertTrue(mutexForClient1.isAcquiredInThisProcess()); } + + future1.get(); + future2.get(); } finally { - CloseableUtils.closeQuietly(client1); - CloseableUtils.closeQuietly(client2); + TestCleanState.closeAndTestClean(client1); + TestCleanState.closeAndTestClean(client2); } } } http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java index f7636ed..48e4805 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java @@ -16,14 +16,15 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.curator.framework.recipes.locks; import com.google.common.collect.Lists; -import org.apache.curator.test.BaseClassForTests; -import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.imps.TestCleanState; import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; import org.testng.Assert; import org.testng.annotations.Test; import java.util.Collection; @@ -31,6 +32,7 @@ import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -40,21 +42,22 @@ import java.util.concurrent.atomic.AtomicInteger; public class TestInterProcessReadWriteLock extends BaseClassForTests { @Test - public void testGetParticipantNodes() throws Exception + public void testGetParticipantNodes() throws Exception { - final int READERS = 20; - final int WRITERS = 8; + final int READERS = 20; + final int WRITERS = 8; - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); try { client.start(); - final CountDownLatch latch = new CountDownLatch(READERS + WRITERS); - final CountDownLatch readLatch = new CountDownLatch(READERS); - final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock"); + final CountDownLatch latch = new CountDownLatch(READERS + WRITERS); + final CountDownLatch readLatch = new CountDownLatch(READERS); + final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock"); - ExecutorService service = Executors.newCachedThreadPool(); + final CountDownLatch exitLatch = new CountDownLatch(1); + ExecutorCompletionService<Void> service = new ExecutorCompletionService<Void>(Executors.newCachedThreadPool()); for ( int i = 0; i < READERS; ++i ) { service.submit @@ -65,8 +68,16 @@ public class TestInterProcessReadWriteLock extends BaseClassForTests public Void call() throws Exception { lock.readLock().acquire(); - latch.countDown(); - readLatch.countDown(); + try + { + latch.countDown(); + readLatch.countDown(); + exitLatch.await(); + } + finally + { + lock.readLock().release(); + } return null; } } @@ -84,6 +95,14 @@ public class TestInterProcessReadWriteLock extends BaseClassForTests Assert.assertTrue(readLatch.await(10, TimeUnit.SECONDS)); latch.countDown(); // must be before as there can only be one writer lock.writeLock().acquire(); + try + { + exitLatch.await(); + } + finally + { + lock.writeLock().release(); + } return null; } } @@ -97,22 +116,28 @@ public class TestInterProcessReadWriteLock extends BaseClassForTests Assert.assertEquals(readers.size(), READERS); Assert.assertEquals(writers.size(), WRITERS); + + exitLatch.countDown(); + for ( int i = 0; i < (READERS + WRITERS); ++i ) + { + service.take().get(); + } } finally { - CloseableUtils.closeQuietly(client); + TestCleanState.closeAndTestClean(client); } } @Test - public void testThatUpgradingIsDisallowed() throws Exception + public void testThatUpgradingIsDisallowed() throws Exception { - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); try { client.start(); - InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock"); + InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock"); lock.readLock().acquire(); Assert.assertFalse(lock.writeLock().acquire(5, TimeUnit.SECONDS)); @@ -120,70 +145,80 @@ public class TestInterProcessReadWriteLock extends BaseClassForTests } finally { - CloseableUtils.closeQuietly(client); + TestCleanState.closeAndTestClean(client); } } @Test - public void testThatDowngradingRespectsThreads() throws Exception + public void testThatDowngradingRespectsThreads() throws Exception { - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); try { client.start(); - final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock"); - ExecutorService t1 = Executors.newSingleThreadExecutor(); - ExecutorService t2 = Executors.newSingleThreadExecutor(); + final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock"); + ExecutorService t1 = Executors.newSingleThreadExecutor(); + ExecutorService t2 = Executors.newSingleThreadExecutor(); - final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch latch = new CountDownLatch(1); - Future<Object> f1 = t1.submit - ( - new Callable<Object>() - { - @Override - public Object call() throws Exception + final CountDownLatch releaseLatch = new CountDownLatch(1); + Future<Object> f1 = t1.submit + ( + new Callable<Object>() { - lock.writeLock().acquire(); - latch.countDown(); - return null; + @Override + public Object call() throws Exception + { + lock.writeLock().acquire(); + latch.countDown(); + try + { + releaseLatch.await(); + } + finally + { + lock.writeLock().release(); + } + return null; + } } - } - ); + ); - Future<Object> f2 = t2.submit - ( - new Callable<Object>() - { - @Override - public Object call() throws Exception + Future<Object> f2 = t2.submit + ( + new Callable<Object>() { - Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); - Assert.assertFalse(lock.readLock().acquire(5, TimeUnit.SECONDS)); - return null; + @Override + public Object call() throws Exception + { + Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); + Assert.assertFalse(lock.readLock().acquire(5, TimeUnit.SECONDS)); + return null; + } } - } - ); + ); - f1.get(); f2.get(); + releaseLatch.countDown(); + f1.get(); } finally { - CloseableUtils.closeQuietly(client); + TestCleanState.closeAndTestClean(client); } } @Test - public void testDowngrading() throws Exception + public void testDowngrading() throws Exception { - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); try { client.start(); - InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock"); + InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock"); lock.writeLock().acquire(); Assert.assertTrue(lock.readLock().acquire(5, TimeUnit.SECONDS)); lock.writeLock().release(); @@ -192,60 +227,60 @@ public class TestInterProcessReadWriteLock extends BaseClassForTests } finally { - CloseableUtils.closeQuietly(client); + TestCleanState.closeAndTestClean(client); } } @Test - public void testBasic() throws Exception + public void testBasic() throws Exception { - final int CONCURRENCY = 8; - final int ITERATIONS = 100; + final int CONCURRENCY = 8; + final int ITERATIONS = 100; - final Random random = new Random(); - final AtomicInteger concurrentCount = new AtomicInteger(0); - final AtomicInteger maxConcurrentCount = new AtomicInteger(0); - final AtomicInteger writeCount = new AtomicInteger(0); - final AtomicInteger readCount = new AtomicInteger(0); + final Random random = new Random(); + final AtomicInteger concurrentCount = new AtomicInteger(0); + final AtomicInteger maxConcurrentCount = new AtomicInteger(0); + final AtomicInteger writeCount = new AtomicInteger(0); + final AtomicInteger readCount = new AtomicInteger(0); - List<Future<Void>> futures = Lists.newArrayList(); - ExecutorService service = Executors.newCachedThreadPool(); + List<Future<Void>> futures = Lists.newArrayList(); + ExecutorService service = Executors.newCachedThreadPool(); for ( int i = 0; i < CONCURRENCY; ++i ) { - Future<Void> future = service.submit - ( - new Callable<Void>() - { - @Override - public Void call() throws Exception + Future<Void> future = service.submit + ( + new Callable<Void>() { - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); - client.start(); - try + @Override + public Void call() throws Exception { - InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock"); - for ( int i = 0; i < ITERATIONS; ++i ) + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + client.start(); + try { - if ( random.nextInt(100) < 10 ) + InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock"); + for ( int i = 0; i < ITERATIONS; ++i ) { - doLocking(lock.writeLock(), concurrentCount, maxConcurrentCount, random, 1); - writeCount.incrementAndGet(); - } - else - { - doLocking(lock.readLock(), concurrentCount, maxConcurrentCount, random, Integer.MAX_VALUE); - readCount.incrementAndGet(); + if ( random.nextInt(100) < 10 ) + { + doLocking(lock.writeLock(), concurrentCount, maxConcurrentCount, random, 1); + writeCount.incrementAndGet(); + } + else + { + doLocking(lock.readLock(), concurrentCount, maxConcurrentCount, random, Integer.MAX_VALUE); + readCount.incrementAndGet(); + } } } + finally + { + TestCleanState.closeAndTestClean(client); + } + return null; } - finally - { - CloseableUtils.closeQuietly(client); - } - return null; } - } - ); + ); futures.add(future); } @@ -262,17 +297,17 @@ public class TestInterProcessReadWriteLock extends BaseClassForTests } @Test - public void testSetNodeData() throws Exception + public void testSetNodeData() throws Exception { - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); try { client.start(); - final byte[] nodeData = new byte[] { 1, 2, 3, 4 }; + final byte[] nodeData = new byte[]{1, 2, 3, 4}; - InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock", nodeData); + InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock", nodeData); // mutate passed-in node data, lock has made copy nodeData[0] = 5; @@ -284,13 +319,13 @@ public class TestInterProcessReadWriteLock extends BaseClassForTests byte dataInZk[] = client.getData().forPath("/lock/" + children.get(0)); Assert.assertNotNull(dataInZk); - Assert.assertEquals(new byte[] { 1, 2, 3, 4 }, dataInZk); + Assert.assertEquals(new byte[]{1, 2, 3, 4}, dataInZk); lock.writeLock().release(); } finally { - CloseableUtils.closeQuietly(client); + TestCleanState.closeAndTestClean(client); } } @@ -299,7 +334,7 @@ public class TestInterProcessReadWriteLock extends BaseClassForTests try { Assert.assertTrue(lock.acquire(10, TimeUnit.SECONDS)); - int localConcurrentCount; + int localConcurrentCount; synchronized(this) { localConcurrentCount = concurrentCount.incrementAndGet(); http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java index dd3f98f..2797b5f 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java @@ -20,13 +20,14 @@ package org.apache.curator.framework.recipes.locks; import com.google.common.collect.Lists; -import org.apache.curator.test.BaseClassForTests; -import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.imps.TestCleanState; import org.apache.curator.framework.recipes.shared.SharedCount; import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.Timing; +import org.apache.curator.utils.CloseableUtils; import org.testng.Assert; import org.testng.annotations.Test; import java.util.Collection; @@ -100,10 +101,12 @@ public class TestInterProcessSemaphore extends BaseClassForTests future1.get(); future2.get(); + + count.close(); } finally { - CloseableUtils.closeQuietly(client); + TestCleanState.closeAndTestClean(client); } } @@ -140,8 +143,8 @@ public class TestInterProcessSemaphore extends BaseClassForTests } finally { - CloseableUtils.closeQuietly(client1); - CloseableUtils.closeQuietly(client2); + TestCleanState.closeAndTestClean(client1); + TestCleanState.closeAndTestClean(client2); } } @@ -226,7 +229,7 @@ public class TestInterProcessSemaphore extends BaseClassForTests } finally { - client.close(); + TestCleanState.closeAndTestClean(client); } return null; } @@ -299,7 +302,7 @@ public class TestInterProcessSemaphore extends BaseClassForTests } finally { - client.close(); + TestCleanState.closeAndTestClean(client); } return null; } @@ -401,7 +404,7 @@ public class TestInterProcessSemaphore extends BaseClassForTests } finally { - client.close(); + TestCleanState.closeAndTestClean(client); } } @@ -445,7 +448,7 @@ public class TestInterProcessSemaphore extends BaseClassForTests } finally { - client.close(); + TestCleanState.closeAndTestClean(client); } } @@ -463,7 +466,7 @@ public class TestInterProcessSemaphore extends BaseClassForTests } finally { - client.close(); + TestCleanState.closeAndTestClean(client); } } @@ -499,7 +502,7 @@ public class TestInterProcessSemaphore extends BaseClassForTests { CloseableUtils.closeQuietly(l); } - CloseableUtils.closeQuietly(client); + TestCleanState.closeAndTestClean(client); } } @@ -528,7 +531,7 @@ public class TestInterProcessSemaphore extends BaseClassForTests { CloseableUtils.closeQuietly(l); } - CloseableUtils.closeQuietly(client); + TestCleanState.closeAndTestClean(client); } } } http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java index 2aa8a72..f4cb7bb 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java @@ -22,6 +22,7 @@ import com.google.common.collect.Lists; import org.apache.curator.ensemble.EnsembleProvider; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.imps.TestCleanState; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.ExponentialBackoffRetry; @@ -147,7 +148,7 @@ public class TestInterProcessSemaphoreCluster } finally { - CloseableUtils.closeQuietly(client); + TestCleanState.closeAndTestClean(client); } return null; } http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockACLs.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockACLs.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockACLs.java index 2d9a9aa..d1e6db5 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockACLs.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockACLs.java @@ -19,6 +19,7 @@ package org.apache.curator.framework.recipes.locks; +import org.apache.curator.framework.imps.TestCleanState; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.RetryPolicy; @@ -74,7 +75,7 @@ public class TestLockACLs extends BaseClassForTests } finally { - CloseableUtils.closeQuietly(client); + TestCleanState.closeAndTestClean(client); } } http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockCleanlinessWithFaults.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockCleanlinessWithFaults.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockCleanlinessWithFaults.java index 457be75..dc14c11 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockCleanlinessWithFaults.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockCleanlinessWithFaults.java @@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.locks; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.imps.TestCleanState; import org.apache.curator.retry.RetryNTimes; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.utils.CloseableUtils; @@ -67,7 +68,7 @@ public class TestLockCleanlinessWithFaults extends BaseClassForTests } finally { - CloseableUtils.closeQuietly(client); + TestCleanState.closeAndTestClean(client); } } }
