Repository: curator Updated Branches: refs/heads/CURATOR-88 ae4813512 -> 203b49d90
more read/write tests Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/203b49d9 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/203b49d9 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/203b49d9 Branch: refs/heads/CURATOR-88 Commit: 203b49d905b34a7c3b17ab7553baea0c5cb1f63a Parents: ae48135 Author: randgalt <randg...@apache.org> Authored: Fri Mar 7 07:39:19 2014 -0500 Committer: randgalt <randg...@apache.org> Committed: Fri Mar 7 07:39:19 2014 -0500 ---------------------------------------------------------------------- .../locks/TestInterProcessReadWriteLock.java | 161 +++++++++++++++ .../TestInterProcessReadWriteLockBase.java | 207 +++++++------------ .../TestInterProcessSemaphoreReadWrite.java | 26 +-- 3 files changed, 243 insertions(+), 151 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/203b49d9/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java index 91e1ce0..49fbb55 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java @@ -20,9 +20,170 @@ package org.apache.curator.framework.recipes.locks; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.Timing; +import org.apache.curator.utils.CloseableUtils; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.Collection; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; public class TestInterProcessReadWriteLock extends TestInterProcessReadWriteLockBase { + @Test + public void testGetParticipantNodes() throws Exception + { + final int READERS = 20; + final int WRITERS = 8; + + final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + try + { + client.start(); + + final CountDownLatch latch = new CountDownLatch(READERS + WRITERS); + final CountDownLatch readLatch = new CountDownLatch(READERS); + + ExecutorService service = Executors.newCachedThreadPool(); + for ( int i = 0; i < READERS; ++i ) + { + service.submit(new Callable<Void>() + { + @Override + public Void call() throws Exception + { + InterProcessReadWriteLockBase lock = newLock(client, "/lock"); + lock.readLock().acquire(); + latch.countDown(); + readLatch.countDown(); + return null; + } + }); + } + for ( int i = 0; i < WRITERS; ++i ) + { + service.submit(new Callable<Void>() + { + @Override + public Void call() throws Exception + { + InterProcessReadWriteLockBase lock = newLock(client, "/lock"); + Assert.assertTrue(readLatch.await(10, TimeUnit.SECONDS)); + latch.countDown(); // must be before as there can only be one writer + lock.writeLock().acquire(); + return null; + } + }); + } + + Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); + new Timing().sleepABit(); + + InterProcessReadWriteLockBase lock = newLock(client, "/lock"); + Collection<String> readers = lock.readLock().getParticipantNodes(); + Collection<String> writers = lock.writeLock().getParticipantNodes(); + + Assert.assertEquals(readers.size(), READERS); + Assert.assertEquals(writers.size(), WRITERS); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testThatUpgradingIsDisallowed() throws Exception + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + try + { + client.start(); + + InterProcessReadWriteLockBase lock = newLock(client, "/lock"); + lock.readLock().acquire(); + Assert.assertFalse(lock.writeLock().acquire(5, TimeUnit.SECONDS)); + + lock.readLock().release(); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testThatDowngradingRespectsThreads() throws Exception + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + try + { + client.start(); + + final InterProcessReadWriteLockBase lock = newLock(client, "/lock"); + ExecutorService t1 = Executors.newSingleThreadExecutor(); + ExecutorService t2 = Executors.newSingleThreadExecutor(); + + final CountDownLatch latch = new CountDownLatch(1); + + Future<Object> f1 = t1.submit(new Callable<Object>() + { + @Override + public Object call() throws Exception + { + lock.writeLock().acquire(); + latch.countDown(); + return null; + } + }); + + Future<Object> f2 = t2.submit(new Callable<Object>() + { + @Override + public Object call() throws Exception + { + Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); + Assert.assertFalse(lock.readLock().acquire(5, TimeUnit.SECONDS)); + return null; + } + }); + + f1.get(); + f2.get(); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testDowngrading() throws Exception + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + try + { + client.start(); + + InterProcessReadWriteLockBase lock = newLock(client, "/lock"); + lock.writeLock().acquire(); + Assert.assertTrue(lock.readLock().acquire(5, TimeUnit.SECONDS)); + lock.writeLock().release(); + + lock.readLock().release(); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + @Override protected InterProcessReadWriteLockBase newLock(CuratorFramework client, String path) { http://git-wip-us.apache.org/repos/asf/curator/blob/203b49d9/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLockBase.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLockBase.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLockBase.java index 81a6b14..0bd36d8 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLockBase.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLockBase.java @@ -19,7 +19,6 @@ package org.apache.curator.framework.recipes.locks; -import com.google.common.collect.Lists; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.BaseClassForTests; @@ -28,11 +27,9 @@ import org.apache.curator.test.Timing; import org.apache.curator.utils.CloseableUtils; import org.testng.Assert; import org.testng.annotations.Test; -import java.util.Collection; -import java.util.List; import java.util.Random; import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -42,150 +39,104 @@ import java.util.concurrent.atomic.AtomicInteger; public abstract class TestInterProcessReadWriteLockBase extends BaseClassForTests { @Test - public void testGetParticipantNodes() throws Exception + public void testWriterAgainstConstantReaders() throws Exception { - final int READERS = 20; - final int WRITERS = 8; + final int CONCURRENCY = 8; + final int WRITER_ATTEMPTS = 10; - final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); - try + ExecutorService service = Executors.newCachedThreadPool(); + ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<Void>(service); + for ( int i = 0; i < CONCURRENCY; ++i ) { - client.start(); - - final CountDownLatch latch = new CountDownLatch(READERS + WRITERS); - final CountDownLatch readLatch = new CountDownLatch(READERS); - - ExecutorService service = Executors.newCachedThreadPool(); - for ( int i = 0; i < READERS; ++i ) + completionService.submit(new Callable<Void>() { - service.submit(new Callable<Void>() + @Override + public Void call() throws Exception { - @Override - public Void call() throws Exception + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + client.start(); + try { InterProcessReadWriteLockBase lock = newLock(client, "/lock"); - lock.readLock().acquire(); - latch.countDown(); - readLatch.countDown(); - return null; + try + { + while ( !Thread.currentThread().isInterrupted() ) + { + lock.readLock().acquire(); + try + { + Thread.sleep(100); + } + finally + { + lock.readLock().release(); + } + } + } + catch ( InterruptedException dummy ) + { + Thread.currentThread().interrupt(); + } } - }); - } - for ( int i = 0; i < WRITERS; ++i ) - { - service.submit(new Callable<Void>() - { - @Override - public Void call() throws Exception + finally { - InterProcessReadWriteLockBase lock = newLock(client, "/lock"); - Assert.assertTrue(readLatch.await(10, TimeUnit.SECONDS)); - latch.countDown(); // must be before as there can only be one writer - lock.writeLock().acquire(); - return null; + CloseableUtils.closeQuietly(client); } - }); - } - - Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); - new Timing().sleepABit(); - - InterProcessReadWriteLockBase lock = newLock(client, "/lock"); - Collection<String> readers = lock.readLock().getParticipantNodes(); - Collection<String> writers = lock.writeLock().getParticipantNodes(); - - Assert.assertEquals(readers.size(), READERS); - Assert.assertEquals(writers.size(), WRITERS); - } - finally - { - CloseableUtils.closeQuietly(client); + return null; + } + }); } - } - - @Test - public void testThatUpgradingIsDisallowed() throws Exception - { - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); - try - { - client.start(); - InterProcessReadWriteLockBase lock = newLock(client, "/lock"); - lock.readLock().acquire(); - Assert.assertFalse(lock.writeLock().acquire(5, TimeUnit.SECONDS)); + new Timing().sleepABit(); - lock.readLock().release(); - } - finally + final AtomicInteger writerCount = new AtomicInteger(); + Future<Void> writerThread = completionService.submit(new Callable<Void>() { - CloseableUtils.closeQuietly(client); - } - } - - @Test - public void testThatDowngradingRespectsThreads() throws Exception - { - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); - try - { - client.start(); - - final InterProcessReadWriteLockBase lock = newLock(client, "/lock"); - ExecutorService t1 = Executors.newSingleThreadExecutor(); - ExecutorService t2 = Executors.newSingleThreadExecutor(); - - final CountDownLatch latch = new CountDownLatch(1); - - Future<Object> f1 = t1.submit(new Callable<Object>() + @Override + public Void call() throws Exception { - @Override - public Object call() throws Exception + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + client.start(); + try { - lock.writeLock().acquire(); - latch.countDown(); - return null; + InterProcessReadWriteLockBase lock = newLock(client, "/lock"); + for ( int i = 0; i < WRITER_ATTEMPTS; ++i ) + { + if ( !lock.writeLock().acquire(10, TimeUnit.SECONDS) ) + { + throw new Exception("Could not get write lock"); + } + try + { + writerCount.incrementAndGet(); + Thread.sleep(100); + } + finally + { + lock.writeLock().release(); + } + } } - }); - - Future<Object> f2 = t2.submit(new Callable<Object>() - { - @Override - public Object call() throws Exception + finally { - Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); - Assert.assertFalse(lock.readLock().acquire(5, TimeUnit.SECONDS)); - return null; + CloseableUtils.closeQuietly(client); } - }); - - f1.get(); - f2.get(); - } - finally - { - CloseableUtils.closeQuietly(client); - } - } + return null; + } + }); - @Test - public void testDowngrading() throws Exception - { - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + writerThread.get(); try { - client.start(); - - InterProcessReadWriteLockBase lock = newLock(client, "/lock"); - lock.writeLock().acquire(); - Assert.assertTrue(lock.readLock().acquire(5, TimeUnit.SECONDS)); - lock.writeLock().release(); - - lock.readLock().release(); + Assert.assertEquals(writerCount.get(), WRITER_ATTEMPTS); } finally { - CloseableUtils.closeQuietly(client); + service.shutdownNow(); + for ( int i =0; i < CONCURRENCY; ++i ) + { + completionService.take().get(); + } } } @@ -201,11 +152,11 @@ public abstract class TestInterProcessReadWriteLockBase extends BaseClassForTest final AtomicInteger writeCount = new AtomicInteger(0); final AtomicInteger readCount = new AtomicInteger(0); - List<Future<Void>> futures = Lists.newArrayList(); ExecutorService service = Executors.newCachedThreadPool(); + ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<Void>(service); for ( int i = 0; i < CONCURRENCY; ++i ) { - Future<Void> future = service.submit(new Callable<Void>() + completionService.submit(new Callable<Void>() { @Override public Void call() throws Exception @@ -236,12 +187,10 @@ public abstract class TestInterProcessReadWriteLockBase extends BaseClassForTest return null; } }); - futures.add(future); } - - for ( Future<Void> future : futures ) + for ( int i =0; i < CONCURRENCY; ++i ) { - future.get(); + completionService.take().get(); } System.out.println("Writes: " + writeCount.get() + " - Reads: " + readCount.get() + " - Max Reads: " + maxConcurrentCount.get()); http://git-wip-us.apache.org/repos/asf/curator/blob/203b49d9/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreReadWrite.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreReadWrite.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreReadWrite.java index 7faa198..469cc2c 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreReadWrite.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreReadWrite.java @@ -20,30 +20,12 @@ package org.apache.curator.framework.recipes.locks; import org.apache.curator.framework.CuratorFramework; -import org.testng.annotations.Test; -public class TestInterProcessSemaphoreReadWrite +public class TestInterProcessSemaphoreReadWrite extends TestInterProcessReadWriteLockBase { - @Test - public void testBasic() throws Exception + @Override + protected InterProcessReadWriteLockBase newLock(CuratorFramework client, String path) { - TestInterProcessReadWriteLockBase base = new TestInterProcessReadWriteLockBase() - { - @Override - protected InterProcessReadWriteLockBase newLock(CuratorFramework client, String path) - { - return new InterProcessSemaphoreReadWrite(client, path); - } - }; - - base.setup(); - try - { - base.testBasic(); - } - finally - { - base.teardown(); - } + return new InterProcessSemaphoreReadWrite(client, path); } }