Merge branch 'CURATOR-311' of github.com:oza/curator into CURATOR-311
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/174faef5 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/174faef5 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/174faef5 Branch: refs/heads/master Commit: 174faef5f0de10626c616d2a25eb9fb1e5572966 Parents: 0f5d10d 8c1c5ff Author: randgalt <[email protected]> Authored: Mon Jul 10 10:55:00 2017 -0500 Committer: randgalt <[email protected]> Committed: Mon Jul 10 10:55:00 2017 -0500 ---------------------------------------------------------------------- .../framework/recipes/shared/SharedCount.java | 5 + .../framework/recipes/shared/SharedValue.java | 27 ++++- .../recipes/shared/TestSharedCount.java | 116 ++++++++++++++++++- 3 files changed, 146 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/174faef5/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java ---------------------------------------------------------------------- diff --cc curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java index 1a3d889,7e3f26a..68fd5b5 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java @@@ -94,9 -106,21 +107,21 @@@ public class SharedValue implements Clo */ public SharedValue(CuratorFramework client, String path, byte[] seedValue) { - this.client = client; + this.client = client.newWatcherRemoveCuratorFramework(); this.path = PathUtils.validatePath(path); this.seedValue = Arrays.copyOf(seedValue, seedValue.length); + this.watcher = new SharedValueCuratorWatcher(); + currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length))); + } + + @VisibleForTesting + protected SharedValue(CuratorFramework client, String path, byte[] seedValue, CuratorWatcher watcher) + { + this.client = client; + this.path = PathUtils.validatePath(path); + this.seedValue = Arrays.copyOf(seedValue, seedValue.length); + // inject watcher for testing + this.watcher = watcher; currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length))); } http://git-wip-us.apache.org/repos/asf/curator/blob/174faef5/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java ---------------------------------------------------------------------- diff --cc curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java index 0690d6a,330c8f4..d7ebb6c --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java @@@ -23,9 -23,9 +23,10 @@@ import com.google.common.collect.Lists import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.imps.TestCleanState; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; + import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.RetryNTimes; @@@ -424,7 -392,115 +430,115 @@@ public class TestSharedCount extends Ba finally { CloseableUtils.closeQuietly(sharedCount); - CloseableUtils.closeQuietly(curatorFramework); + TestCleanState.closeAndTestClean(curatorFramework); } } + + + @Test + public void testDisconnectReconnectWithMultipleClients() throws Exception + { + CuratorFramework curatorFramework1 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500)); + CuratorFramework curatorFramework2 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500)); + + curatorFramework1.start(); + curatorFramework1.blockUntilConnected(); + curatorFramework2.start(); + curatorFramework2.blockUntilConnected(); + + final String sharedCountPath = "/count"; + final int initialCount = 10; + SharedCount sharedCount1 = new SharedCount(curatorFramework1, sharedCountPath, initialCount); + SharedCount sharedCountWithFaultyWatcher = createSharedCountWithFaultyWatcher(curatorFramework2, sharedCountPath, initialCount); + + class MySharedCountListener implements SharedCountListener + { + final public Phaser gotSuspendEvent = new Phaser(1); + final public Phaser gotChangeEvent = new Phaser(1); + final public Phaser getReconnectEvent = new Phaser(1); + final public AtomicInteger numChangeEvents = new AtomicInteger(0); + + @Override + public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception + { + numChangeEvents.incrementAndGet(); + gotChangeEvent.arrive(); + } + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if (newState == ConnectionState.SUSPENDED) { + gotSuspendEvent.arrive(); + } else if (newState == ConnectionState.RECONNECTED) { + getReconnectEvent.arrive(); + } + } + } + + MySharedCountListener listener1 = new MySharedCountListener(); + sharedCount1.addListener(listener1); + sharedCount1.start(); + MySharedCountListener listener2 = new MySharedCountListener(); + sharedCountWithFaultyWatcher.addListener(listener2); + + try + { + sharedCount1.setCount(12); + Assert.assertEquals(listener1.gotChangeEvent.awaitAdvanceInterruptibly(0, 2, TimeUnit.SECONDS), 1); + Assert.assertEquals(sharedCount1.getCount(), 12); + + Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 10); + // new counter with faultyWatcher start + sharedCountWithFaultyWatcher.start(); + + for (int i = 0; i < 10; i++) { + sharedCount1.setCount(13 + i); + Assert.assertEquals(sharedCount1.getCount(), 13 + i); + + server.restart(); + + Assert.assertEquals(listener2.getReconnectEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1); + // CURATOR-311 introduces to Curator's client reading server's shared count value + // when client's state gets ConnectionState.RECONNECTED. Following tests ensures that. + Assert.assertEquals(listener2.gotChangeEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1); + Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 13 + i); + } + } + finally + { + CloseableUtils.closeQuietly(sharedCount1); + CloseableUtils.closeQuietly(curatorFramework1); + CloseableUtils.closeQuietly(sharedCountWithFaultyWatcher); + CloseableUtils.closeQuietly(curatorFramework2); + } + } + + private SharedCount createSharedCountWithFaultyWatcher(CuratorFramework curatorFramework, String path, int val) { + + class FaultyCuratorWatcher implements CuratorWatcher { + @Override + public void process(WatchedEvent event) throws Exception { + // everything will be ignored + } + } + + final FaultyCuratorWatcher fautlyWatcher = new FaultyCuratorWatcher(); + + class FaultySharedValue extends SharedValue { + public FaultySharedValue(CuratorFramework client, String path, byte[] seedValue) { + super(client, path, seedValue, fautlyWatcher); + } + }; + + final SharedValue faultySharedValue = new FaultySharedValue(curatorFramework, path, SharedCount.toBytes(val)); + class FaultySharedCount extends SharedCount { + public FaultySharedCount(CuratorFramework client, String path, int val) { + super(client, path, faultySharedValue); + } + }; + return new FaultySharedCount(curatorFramework, path, val); + } + + }
