Repository: curator Updated Branches: refs/heads/CURATOR-88 6477a2f08 -> 79e55cf88
More testing - there are problems with r/w lock and threading. More to come. Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/79e55cf8 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/79e55cf8 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/79e55cf8 Branch: refs/heads/CURATOR-88 Commit: 79e55cf882df32bc21490af7511f5f52f4566c24 Parents: 6477a2f Author: randgalt <randg...@apache.org> Authored: Wed Mar 5 07:27:47 2014 -0500 Committer: randgalt <randg...@apache.org> Committed: Wed Mar 5 07:27:47 2014 -0500 ---------------------------------------------------------------------- .../apache/curator/x/rest/api/TestLocks.java | 100 +++++++++++++- .../x/rest/support/InterProcessLockBridge.java | 8 +- .../InterProcessReadWriteLockBridge.java | 132 +++++++++++++++++++ 3 files changed, 230 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/79e55cf8/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java ---------------------------------------------------------------------- diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java index 7aa4585..168329e 100644 --- a/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java +++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java @@ -28,10 +28,12 @@ import org.apache.curator.x.rest.entities.Status; import org.apache.curator.x.rest.entities.StatusMessage; import org.apache.curator.x.rest.support.BaseClassForTests; import org.apache.curator.x.rest.support.InterProcessLockBridge; +import org.apache.curator.x.rest.support.InterProcessReadWriteLockBridge; import org.apache.curator.x.rest.support.StatusListener; import org.testng.Assert; import org.testng.annotations.Test; import java.util.List; +import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorCompletionService; @@ -41,6 +43,7 @@ import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; public class TestLocks extends BaseClassForTests @@ -48,8 +51,8 @@ public class TestLocks extends BaseClassForTests @Test public void test2Clients() throws Exception { - final InterProcessLock mutexForClient1 = new InterProcessLockBridge(restClient, sessionManager, uriMaker); - final InterProcessLock mutexForClient2 = new InterProcessLockBridge(restClient, sessionManager, uriMaker); + final InterProcessLock mutexForClient1 = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock"); + final InterProcessLock mutexForClient2 = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock"); final CountDownLatch latchForClient1 = new CountDownLatch(1); final CountDownLatch latchForClient2 = new CountDownLatch(1); @@ -187,7 +190,7 @@ public class TestLocks extends BaseClassForTests @Override public Object call() throws Exception { - InterProcessLock lock = new InterProcessLockBridge(restClient, sessionManager, uriMaker); + InterProcessLock lock = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock"); lock.acquire(); try { @@ -228,8 +231,8 @@ public class TestLocks extends BaseClassForTests { final Timing timing = new Timing(); - final InterProcessLock mutex1 = new InterProcessLockBridge(restClient, sessionManager, uriMaker); - final InterProcessLock mutex2 = new InterProcessLockBridge(restClient, sessionManager, uriMaker); + final InterProcessLock mutex1 = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock"); + final InterProcessLock mutex2 = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock"); final Semaphore semaphore = new Semaphore(0); ExecutorCompletionService<Object> service = new ExecutorCompletionService<Object>(Executors.newFixedThreadPool(2)); @@ -281,7 +284,7 @@ public class TestLocks extends BaseClassForTests ExecutorService service = Executors.newCachedThreadPool(); for ( int i = 0; i < THREAD_QTY; ++i ) { - final InterProcessLock mutex = new InterProcessLockBridge(restClient, sessionManager, uriMaker); + final InterProcessLock mutex = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock"); Future<Object> t = service.submit ( new Callable<Object>() @@ -324,4 +327,89 @@ public class TestLocks extends BaseClassForTests t.get(); } } + + @Test + public void testBasicReadWriteLock() throws Exception + { + final int CONCURRENCY = 8; + final int ITERATIONS = 100; + + final Random random = new Random(); + final AtomicInteger concurrentCount = new AtomicInteger(0); + final AtomicInteger maxConcurrentCount = new AtomicInteger(0); + final AtomicInteger writeCount = new AtomicInteger(0); + final AtomicInteger readCount = new AtomicInteger(0); + + List<Future<Void>> futures = Lists.newArrayList(); + ExecutorService service = Executors.newCachedThreadPool(); + for ( int i = 0; i < CONCURRENCY; ++i ) + { + Future<Void> future = service.submit + ( + new Callable<Void>() + { + @Override + public Void call() throws Exception + { + InterProcessReadWriteLockBridge lock = new InterProcessReadWriteLockBridge(restClient, sessionManager, uriMaker, "/lock"); + for ( int i = 0; i < ITERATIONS; ++i ) + { + if ( random.nextInt(100) < 10 ) + { + doLocking(lock.writeLock(), concurrentCount, maxConcurrentCount, random, 1); + writeCount.incrementAndGet(); + } + else + { + doLocking(lock.readLock(), concurrentCount, maxConcurrentCount, random, Integer.MAX_VALUE); + readCount.incrementAndGet(); + } + } + return null; + } + } + ); + futures.add(future); + } + + for ( Future<Void> future : futures ) + { + future.get(); + } + + System.out.println("Writes: " + writeCount.get() + " - Reads: " + readCount.get() + " - Max Reads: " + maxConcurrentCount.get()); + + Assert.assertTrue(writeCount.get() > 0); + Assert.assertTrue(readCount.get() > 0); + Assert.assertTrue(maxConcurrentCount.get() > 1); + } + + private void doLocking(InterProcessLock lock, AtomicInteger concurrentCount, AtomicInteger maxConcurrentCount, Random random, int maxAllowed) throws Exception + { + try + { + Assert.assertTrue(lock.acquire(10, TimeUnit.SECONDS)); + int localConcurrentCount; + synchronized(this) + { + localConcurrentCount = concurrentCount.incrementAndGet(); + if ( localConcurrentCount > maxConcurrentCount.get() ) + { + maxConcurrentCount.set(localConcurrentCount); + } + } + + Assert.assertTrue(localConcurrentCount <= maxAllowed, "" + localConcurrentCount); + + Thread.sleep(random.nextInt(9) + 1); + } + finally + { + synchronized(this) + { + concurrentCount.decrementAndGet(); + lock.release(); + } + } + } } http://git-wip-us.apache.org/repos/asf/curator/blob/79e55cf8/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessLockBridge.java ---------------------------------------------------------------------- diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessLockBridge.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessLockBridge.java index 15a7a5c..1da27ba 100644 --- a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessLockBridge.java +++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessLockBridge.java @@ -36,16 +36,16 @@ public class InterProcessLockBridge implements InterProcessLock private final Client restClient; private final SessionManager sessionManager; private final UriMaker uriMaker; + private final String path; private volatile String id = null; - private static final String PATH = "/lock"; - - public InterProcessLockBridge(Client restClient, SessionManager sessionManager, UriMaker uriMaker) + public InterProcessLockBridge(Client restClient, SessionManager sessionManager, UriMaker uriMaker, String path) { this.restClient = restClient; this.sessionManager = sessionManager; this.uriMaker = uriMaker; + this.path = path; } @Override @@ -61,7 +61,7 @@ public class InterProcessLockBridge implements InterProcessLock URI uri = uriMaker.getMethodUri(LockResource.class, null); LockSpec lockSpec = new LockSpec(); - lockSpec.setPath(PATH); + lockSpec.setPath(path); lockSpec.setMaxWaitMs((int)unit.toMillis(time)); try { http://git-wip-us.apache.org/repos/asf/curator/blob/79e55cf8/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessReadWriteLockBridge.java ---------------------------------------------------------------------- diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessReadWriteLockBridge.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessReadWriteLockBridge.java new file mode 100644 index 0000000..ea21f98 --- /dev/null +++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessReadWriteLockBridge.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.x.rest.support; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.UniformInterfaceException; +import org.apache.curator.framework.recipes.locks.InterProcessLock; +import org.apache.curator.x.rest.api.ReadWriteLockResource; +import org.apache.curator.x.rest.entities.Id; +import org.apache.curator.x.rest.entities.LockSpec; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.net.URI; +import java.util.concurrent.TimeUnit; + +public class InterProcessReadWriteLockBridge +{ + private final Client restClient; + private final SessionManager sessionManager; + private final UriMaker uriMaker; + private final String path; + + public InterProcessReadWriteLockBridge(Client restClient, SessionManager sessionManager, UriMaker uriMaker, String path) + { + + this.restClient = restClient; + this.sessionManager = sessionManager; + this.uriMaker = uriMaker; + this.path = path; + } + + public InterProcessLock writeLock() + { + return new InternalLock(true); + } + + public InterProcessLock readLock() + { + return new InternalLock(false); + } + + private class InternalLock implements InterProcessLock + { + private final boolean writeLock; + private final ThreadLocal<String> id = new ThreadLocal<String>(); + + public InternalLock(boolean writeLock) + { + this.writeLock = writeLock; + } + + @Override + public void acquire() throws Exception + { + if ( !acquire(Integer.MAX_VALUE, TimeUnit.MILLISECONDS) ) + { + throw new Exception("Could not acquire"); + } + } + + @Override + public boolean acquire(long time, TimeUnit unit) throws Exception + { + if ( id.get() != null ) + { + throw new Exception("Already acquired in this thread"); + } + + URI uri = uriMaker.getMethodUri(ReadWriteLockResource.class, writeLock ? "acquireWriteLock" : "acquireReadLock"); + LockSpec lockSpec = new LockSpec(); + lockSpec.setPath(path); + lockSpec.setMaxWaitMs((int)unit.toMillis(time)); + + String id; + try + { + id = restClient.resource(uri).type(MediaType.APPLICATION_JSON_TYPE).post(Id.class, lockSpec).getId(); + } + catch ( UniformInterfaceException e ) + { + if ( e.getResponse().getStatus() == Response.Status.SERVICE_UNAVAILABLE.getStatusCode() ) + { + return false; + } + throw e; + } + + this.id.set(id); + sessionManager.addEntry(uriMaker.getLocalhost(), id, null); + return true; + } + + @Override + public void release() throws Exception + { + String localId = id.get(); + if ( localId == null ) + { + throw new Exception("Not acquired in this thread"); + } + + URI uri = uriMaker.getMethodUri(ReadWriteLockResource.class, null); + restClient.resource(uri).path(localId).delete(); + + sessionManager.removeEntry(uriMaker.getLocalhost(), localId); + id.set(null); + } + + @Override + public boolean isAcquiredInThisProcess() + { + return id.get() != null; + } + } +}