Repository: curator Updated Branches: refs/heads/CURATOR-151 [created] 0c2a5a5b4
Added APIs for getting/setting shared values with versions for better utility Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/0c2a5a5b Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/0c2a5a5b Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/0c2a5a5b Branch: refs/heads/CURATOR-151 Commit: 0c2a5a5b4f6cc07b9b1cd9c83ad77c3fa0170779 Parents: b66e9b6 Author: randgalt <[email protected]> Authored: Thu Oct 2 13:04:26 2014 -0500 Committer: randgalt <[email protected]> Committed: Thu Oct 2 13:04:26 2014 -0500 ---------------------------------------------------------------------- .../framework/recipes/shared/SharedCount.java | 28 ++- .../recipes/shared/SharedCountReader.java | 9 +- .../framework/recipes/shared/SharedValue.java | 138 ++++++++----- .../recipes/shared/SharedValueReader.java | 7 + .../recipes/shared/VersionedValue.java | 32 +++ .../recipes/shared/TestSharedCount.java | 195 +++++++++++++------ 6 files changed, 298 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/0c2a5a5b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java index b2d1218..49b2f2d 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java @@ -18,6 +18,7 @@ */ package org.apache.curator.framework.recipes.shared; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import com.google.common.util.concurrent.MoreExecutors; import org.apache.curator.framework.CuratorFramework; @@ -54,6 +55,13 @@ public class SharedCount implements Closeable, SharedCountReader, Listenable<Sha return fromBytes(sharedValue.getValue()); } + @Override + public VersionedValue<Integer> getVersionedValue() + { + VersionedValue<byte[]> localValue = sharedValue.getVersionedValue(); + return new VersionedValue<Integer>(localValue.getVersion(), fromBytes(localValue.getValue())); + } + /** * Change the shared count value irrespective of its previous state * @@ -81,6 +89,23 @@ public class SharedCount implements Closeable, SharedCountReader, Listenable<Sha return sharedValue.trySetValue(toBytes(newCount)); } + /** + * Changes the shared count only if its value has not changed since the version specified by + * newCount. If the count has changed, the value is not set and this client's view of the + * value is updated. i.e. if the count is not successful you can get the updated value + * by calling {@link #getCount()}. + * + * @param newCount the new value to attempt + * @return true if the change attempt was successful, false if not. If the change + * was not successful, {@link #getCount()} will return the updated value + * @throws Exception ZK errors, interruptions, etc. + */ + public boolean trySetCount(VersionedValue<Integer> newCount) throws Exception + { + VersionedValue<byte[]> copy = new VersionedValue<byte[]>(newCount.getVersion(), toBytes(newCount.getValue())); + return sharedValue.trySetValue(copy); + } + @Override public void addListener(SharedCountListener listener) { @@ -131,7 +156,8 @@ public class SharedCount implements Closeable, SharedCountReader, Listenable<Sha sharedValue.close(); } - private static byte[] toBytes(int value) + @VisibleForTesting + static byte[] toBytes(int value) { byte[] bytes = new byte[4]; ByteBuffer.wrap(bytes).putInt(value); http://git-wip-us.apache.org/repos/asf/curator/blob/0c2a5a5b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCountReader.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCountReader.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCountReader.java index 3d3d6a4..cae31bb 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCountReader.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCountReader.java @@ -30,5 +30,12 @@ public interface SharedCountReader extends Listenable<SharedCountListener> * * @return count */ - int getCount(); + public int getCount(); + + /** + * Return the current count and version + * + * @return count and version + */ + public VersionedValue<Integer> getVersionedValue(); } http://git-wip-us.apache.org/repos/asf/curator/blob/0c2a5a5b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java index 5c4b53b..80fa53f 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.curator.framework.recipes.shared; import com.google.common.base.Function; @@ -25,6 +26,7 @@ import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.framework.listen.ListenerContainer; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.utils.PathUtils; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.data.Stat; @@ -34,7 +36,6 @@ import java.io.Closeable; import java.io.IOException; import java.util.Arrays; import java.util.concurrent.atomic.AtomicReference; -import org.apache.curator.utils.PathUtils; /** * Manages a shared value. All clients watching the same path will have the up-to-date @@ -42,14 +43,15 @@ import org.apache.curator.utils.PathUtils; */ public class SharedValue implements Closeable, SharedValueReader { - private final Logger log = LoggerFactory.getLogger(getClass()); - private final ListenerContainer<SharedValueListener> listeners = new ListenerContainer<SharedValueListener>(); - private final CuratorFramework client; - private final String path; - private final byte[] seedValue; - private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT); - - private final CuratorWatcher watcher = new CuratorWatcher() + private final Logger log = LoggerFactory.getLogger(getClass()); + private final ListenerContainer<SharedValueListener> listeners = new ListenerContainer<SharedValueListener>(); + private final CuratorFramework client; + private final String path; + private final byte[] seedValue; + private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT); + private final AtomicReference<VersionedValue<byte[]>> currentValue; + + private final CuratorWatcher watcher = new CuratorWatcher() { @Override public void process(WatchedEvent event) throws Exception @@ -62,7 +64,7 @@ public class SharedValue implements Closeable, SharedValueReader } }; - private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() + private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) @@ -78,12 +80,9 @@ public class SharedValue implements Closeable, SharedValueReader CLOSED } - private volatile byte[] value; - private volatile Stat stat = new Stat(); - /** - * @param client the client - * @param path the shared path - i.e. where the shared value is stored + * @param client the client + * @param path the shared path - i.e. where the shared value is stored * @param seedValue the initial value for the value if/f the path has not yet been created */ public SharedValue(CuratorFramework client, String path, byte[] seedValue) @@ -91,13 +90,21 @@ public class SharedValue implements Closeable, SharedValueReader this.client = client; this.path = PathUtils.validatePath(path); this.seedValue = Arrays.copyOf(seedValue, seedValue.length); - value = seedValue; + currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(0, Arrays.copyOf(seedValue, seedValue.length))); } @Override public byte[] getValue() { - return Arrays.copyOf(value, value.length); + VersionedValue<byte[]> localCopy = currentValue.get(); + return Arrays.copyOf(localCopy.getValue(), localCopy.getValue().length); + } + + @Override + public VersionedValue<byte[]> getVersionedValue() + { + VersionedValue<byte[]> localCopy = currentValue.get(); + return new VersionedValue<byte[]>(localCopy.getVersion(), Arrays.copyOf(localCopy.getValue(), localCopy.getValue().length)); } /** @@ -110,9 +117,10 @@ public class SharedValue implements Closeable, SharedValueReader { Preconditions.checkState(state.get() == State.STARTED, "not started"); + VersionedValue<byte[]> localCopy = currentValue.get(); client.setData().forPath(path, newValue); - stat.setVersion(stat.getVersion() + 1); - value = Arrays.copyOf(newValue, newValue.length); + + currentValue.set(new VersionedValue<byte[]>(localCopy.getVersion() + 1, Arrays.copyOf(newValue, newValue.length))); } /** @@ -132,9 +140,39 @@ public class SharedValue implements Closeable, SharedValueReader try { - client.setData().withVersion(stat.getVersion()).forPath(path, newValue); - stat.setVersion(stat.getVersion() + 1); - value = Arrays.copyOf(newValue, newValue.length); + VersionedValue<byte[]> localCopy = currentValue.get(); + client.setData().withVersion(localCopy.getVersion()).forPath(path, newValue); + currentValue.set(new VersionedValue<byte[]>(localCopy.getVersion() + 1, Arrays.copyOf(newValue, newValue.length))); + return true; + } + catch ( KeeperException.BadVersionException ignore ) + { + // ignore + } + + readValue(); + return false; + } + + /** + * Changes the shared value only if its value has not changed since the version specified by + * newValue. If the value has changed, the value is not set and this client's view of the + * value is updated. i.e. if the value is not successful you can get the updated value + * by calling {@link #getValue()}. + * + * @param newValue the new value to attempt + * @return true if the change attempt was successful, false if not. If the change + * was not successful, {@link #getValue()} will return the updated value + * @throws Exception ZK errors, interruptions, etc. + */ + public boolean trySetValue(VersionedValue<byte[]> newValue) throws Exception + { + Preconditions.checkState(state.get() == State.STARTED, "not started"); + + try + { + client.setData().withVersion(newValue.getVersion()).forPath(path, newValue.getValue()); + currentValue.set(new VersionedValue<byte[]>(newValue.getVersion() + 1, Arrays.copyOf(newValue.getValue(), newValue.getValue().length))); return true; } catch ( KeeperException.BadVersionException ignore ) @@ -162,7 +200,7 @@ public class SharedValue implements Closeable, SharedValueReader * * @throws Exception ZK errors, interruptions, etc. */ - public void start() throws Exception + public void start() throws Exception { Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); @@ -189,48 +227,48 @@ public class SharedValue implements Closeable, SharedValueReader private synchronized void readValue() throws Exception { - Stat localStat = new Stat(); - byte[] bytes = client.getData().storingStatIn(localStat).usingWatcher(watcher).forPath(path); - stat = localStat; - value = bytes; + Stat localStat = new Stat(); + byte[] bytes = client.getData().storingStatIn(localStat).usingWatcher(watcher).forPath(path); + currentValue.set(new VersionedValue<byte[]>(localStat.getVersion(), Arrays.copyOf(bytes, bytes.length))); } private void notifyListeners() { + final byte[] localValue = getValue(); listeners.forEach - ( - new Function<SharedValueListener, Void>() - { - @Override - public Void apply(SharedValueListener listener) + ( + new Function<SharedValueListener, Void>() { - try - { - listener.valueHasChanged(SharedValue.this, value); - } - catch ( Exception e ) + @Override + public Void apply(SharedValueListener listener) { - log.error("From SharedValue listener", e); + try + { + listener.valueHasChanged(SharedValue.this, localValue); + } + catch ( Exception e ) + { + log.error("From SharedValue listener", e); + } + return null; } - return null; } - } - ); + ); } private void notifyListenerOfStateChanged(final ConnectionState newState) { listeners.forEach - ( - new Function<SharedValueListener, Void>() - { - @Override - public Void apply(SharedValueListener listener) + ( + new Function<SharedValueListener, Void>() { - listener.stateChanged(client, newState); - return null; + @Override + public Void apply(SharedValueListener listener) + { + listener.stateChanged(client, newState); + return null; + } } - } - ); + ); } } http://git-wip-us.apache.org/repos/asf/curator/blob/0c2a5a5b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValueReader.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValueReader.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValueReader.java index 93ed99c..e298cca 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValueReader.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValueReader.java @@ -33,6 +33,13 @@ public interface SharedValueReader public byte[] getValue(); /** + * Return the current version and value + * + * @return version/value + */ + public VersionedValue<byte[]> getVersionedValue(); + + /** * Returns the listenable * * @return listenable http://git-wip-us.apache.org/repos/asf/curator/blob/0c2a5a5b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java new file mode 100644 index 0000000..ef4c9cc --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java @@ -0,0 +1,32 @@ +package org.apache.curator.framework.recipes.shared; + +import com.google.common.base.Preconditions; + +/** + * POJO for a version and a value + */ +public class VersionedValue<T> +{ + private final int version; + private final T value; + + /** + * @param version the version + * @param value the value (cannot be null) + */ + public VersionedValue(int version, T value) + { + this.version = version; + this.value = Preconditions.checkNotNull(value, "value cannot be null"); + } + + public int getVersion() + { + return version; + } + + public T getValue() + { + return value; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/0c2a5a5b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java index 08f7263..9fdf20f 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java @@ -16,16 +16,18 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.curator.framework.recipes.shared; import com.google.common.collect.Lists; -import org.apache.curator.test.BaseClassForTests; -import org.apache.curator.utils.CloseableUtils; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.Timing; +import org.apache.curator.utils.CloseableUtils; import org.testng.Assert; import org.testng.annotations.Test; import java.util.List; @@ -42,85 +44,88 @@ import java.util.concurrent.TimeUnit; public class TestSharedCount extends BaseClassForTests { @Test - public void testMultiClients() throws Exception + public void testMultiClients() throws Exception { - final int CLIENT_QTY = 5; + final int CLIENT_QTY = 5; - List<Future<List<Integer>>> futures = Lists.newArrayList(); - final List<CuratorFramework> clients = new CopyOnWriteArrayList<CuratorFramework>(); + List<Future<List<Integer>>> futures = Lists.newArrayList(); + final List<CuratorFramework> clients = new CopyOnWriteArrayList<CuratorFramework>(); + final List<SharedCount> counts = new CopyOnWriteArrayList<SharedCount>(); try { - final CountDownLatch startLatch = new CountDownLatch(CLIENT_QTY); - final Semaphore semaphore = new Semaphore(0); - ExecutorService service = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("Test-%d").build()); + final CountDownLatch startLatch = new CountDownLatch(CLIENT_QTY); + final Semaphore semaphore = new Semaphore(0); + ExecutorService service = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("Test-%d").build()); for ( int i = 0; i < CLIENT_QTY; ++i ) { Future<List<Integer>> future = service.submit - ( - new Callable<List<Integer>>() - { - @Override - public List<Integer> call() throws Exception + ( + new Callable<List<Integer>>() { - final List<Integer> countList = Lists.newArrayList(); - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); - clients.add(client); - client.start(); - - SharedCount count = new SharedCount(client, "/count", 10); - - final CountDownLatch latch = new CountDownLatch(1); - count.addListener - ( - new SharedCountListener() - { - @Override - public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception - { - if ( newCount < 0 ) - { - latch.countDown(); - } - else + @Override + public List<Integer> call() throws Exception + { + final List<Integer> countList = Lists.newArrayList(); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + clients.add(client); + client.start(); + + SharedCount count = new SharedCount(client, "/count", 10); + counts.add(count); + + final CountDownLatch latch = new CountDownLatch(1); + count.addListener + ( + new SharedCountListener() { - countList.add(newCount); - } + @Override + public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception + { + if ( newCount < 0 ) + { + latch.countDown(); + } + else + { + countList.add(newCount); + } + + semaphore.release(); + } - semaphore.release(); - } - - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) - { - } - } - ); - count.start(); - startLatch.countDown(); - latch.await(); - return countList; + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + } + } + ); + count.start(); + startLatch.countDown(); + latch.await(); + return countList; + } } - } - ); + ); futures.add(future); } CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); clients.add(client); client.start(); - + Assert.assertTrue(startLatch.await(10, TimeUnit.SECONDS)); SharedCount count = new SharedCount(client, "/count", 10); + counts.add(count); count.start(); List<Integer> countList = Lists.newArrayList(); - Random random = new Random(); + Random random = new Random(); for ( int i = 0; i < 100; ++i ) { Thread.sleep(random.nextInt(10)); - int next = random.nextInt(100); + int next = random.nextInt(100); countList.add(next); count.setCount(next); @@ -130,12 +135,16 @@ public class TestSharedCount extends BaseClassForTests for ( Future<List<Integer>> future : futures ) { - List<Integer> thisCountList = future.get(); + List<Integer> thisCountList = future.get(); Assert.assertEquals(thisCountList, countList); } } finally { + for ( SharedCount count : counts ) + { + CloseableUtils.closeQuietly(count); + } for ( CuratorFramework client : clients ) { CloseableUtils.closeQuietly(client); @@ -144,13 +153,13 @@ public class TestSharedCount extends BaseClassForTests } @Test - public void testSimple() throws Exception + public void testSimple() throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); - client.start(); + SharedCount count = new SharedCount(client, "/count", 0); try { - SharedCount count = new SharedCount(client, "/count", 0); + client.start(); count.start(); Assert.assertTrue(count.trySetCount(1)); @@ -160,7 +169,75 @@ public class TestSharedCount extends BaseClassForTests } finally { - client.close(); + CloseableUtils.closeQuietly(count); + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testSimpleVersioned() throws Exception + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + SharedCount count = new SharedCount(client, "/count", 0); + client.start(); + try + { + count.start(); + + Assert.assertTrue(count.trySetCount(new VersionedValue<Integer>(0, 1))); + Assert.assertTrue(count.trySetCount(new VersionedValue<Integer>(1, 5))); + Assert.assertTrue(count.trySetCount(new VersionedValue<Integer>(2, 10))); + Assert.assertEquals(count.getCount(), 10); + Assert.assertFalse(count.trySetCount(new VersionedValue<Integer>(10, 20))); + + VersionedValue<Integer> versionedValue = count.getVersionedValue(); + Assert.assertTrue(count.trySetCount(new VersionedValue<Integer>(versionedValue.getVersion(), 100))); + versionedValue = count.getVersionedValue(); + client.setData().forPath("/count", SharedCount.toBytes(88)); + Assert.assertFalse(count.trySetCount(new VersionedValue<Integer>(versionedValue.getVersion(), 234))); + } + finally + { + CloseableUtils.closeQuietly(count); + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testMultiClientVersioned() throws Exception + { + Timing timing = new Timing(); + CuratorFramework client1 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + CuratorFramework client2 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + SharedCount count1 = new SharedCount(client1, "/count", 0); + SharedCount count2 = new SharedCount(client2, "/count", 0); + try + { + client1.start(); + client2.start(); + count1.start(); + count2.start(); + + VersionedValue<Integer> versionedValue = count1.getVersionedValue(); + Assert.assertTrue(count1.trySetCount(new VersionedValue<Integer>(versionedValue.getVersion(), 10))); + timing.sleepABit(); + versionedValue = count2.getVersionedValue(); + Assert.assertTrue(count2.trySetCount(new VersionedValue<Integer>(versionedValue.getVersion(), 20))); + timing.sleepABit(); + + VersionedValue<Integer> versionedValue1 = count1.getVersionedValue(); + VersionedValue<Integer> versionedValue2 = count2.getVersionedValue(); + Assert.assertTrue(count2.trySetCount(new VersionedValue<Integer>(versionedValue2.getVersion(), 30))); + Assert.assertFalse(count1.trySetCount(new VersionedValue<Integer>(versionedValue1.getVersion(), 40))); + versionedValue1 = count1.getVersionedValue(); + Assert.assertTrue(count1.trySetCount(new VersionedValue<Integer>(versionedValue1.getVersion(), 40))); + } + finally + { + CloseableUtils.closeQuietly(count2); + CloseableUtils.closeQuietly(count1); + CloseableUtils.closeQuietly(client2); + CloseableUtils.closeQuietly(client1); } } }
