Repository: curator Updated Branches: refs/heads/CURATOR-2.0 c7df8e251 -> 6af5f367e
Make sure readValueAndNotifyListenersInBackground() is called after a connection problem Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/3e159bdd Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/3e159bdd Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/3e159bdd Branch: refs/heads/CURATOR-2.0 Commit: 3e159bddb91c6e8b4e9e27ccbf00e06b4f35638e Parents: c7df8e2 Author: randgalt <[email protected]> Authored: Mon Jul 10 11:05:57 2017 -0500 Committer: randgalt <[email protected]> Committed: Mon Jul 10 11:21:57 2017 -0500 ---------------------------------------------------------------------- .../framework/recipes/shared/SharedValue.java | 30 +++- .../recipes/shared/TestSharedCount.java | 148 ++++++++++++++++++- 2 files changed, 175 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/3e159bdd/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java index 1f9df37..5478a8f 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java @@ -19,6 +19,7 @@ package org.apache.curator.framework.recipes.shared; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Preconditions; import org.apache.curator.framework.CuratorFramework; @@ -56,8 +57,9 @@ public class SharedValue implements Closeable, SharedValueReader private final byte[] seedValue; private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT); private final AtomicReference<VersionedValue<byte[]>> currentValue; + private final CuratorWatcher watcher; - private final CuratorWatcher watcher = new CuratorWatcher() + private class SharedValueCuratorWatcher implements CuratorWatcher { @Override public void process(WatchedEvent event) throws Exception @@ -68,7 +70,7 @@ public class SharedValue implements Closeable, SharedValueReader readValueAndNotifyListenersInBackground(); } } - }; + } private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() { @@ -76,6 +78,18 @@ public class SharedValue implements Closeable, SharedValueReader public void stateChanged(CuratorFramework client, ConnectionState newState) { notifyListenerOfStateChanged(newState); + if ( newState.isConnected() ) + { + try + { + readValueAndNotifyListenersInBackground(); + } + catch ( Exception e ) + { + ThreadUtils.checkInterrupted(e); + log.error("Could not read value after reconnect", e); + } + } } }; @@ -96,6 +110,18 @@ public class SharedValue implements Closeable, SharedValueReader this.client = client; this.path = PathUtils.validatePath(path); this.seedValue = Arrays.copyOf(seedValue, seedValue.length); + this.watcher = new SharedValueCuratorWatcher(); + currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length))); + } + + @VisibleForTesting + protected SharedValue(CuratorFramework client, String path, byte[] seedValue, CuratorWatcher watcher) + { + this.client = client; + this.path = PathUtils.validatePath(path); + this.seedValue = Arrays.copyOf(seedValue, seedValue.length); + // inject watcher for testing + this.watcher = watcher; currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length))); } http://git-wip-us.apache.org/repos/asf/curator/blob/3e159bdd/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java index 7939f6e..a6a32e9 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java @@ -25,6 +25,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.RetryNTimes; @@ -32,6 +33,7 @@ import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.Timing; import org.apache.curator.utils.CloseableUtils; +import org.apache.zookeeper.WatchedEvent; import org.testng.Assert; import org.testng.annotations.Test; import java.util.List; @@ -42,6 +44,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.Phaser; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -167,10 +170,29 @@ public class TestSharedCount extends BaseClassForTests client.start(); count.start(); + final CountDownLatch setLatch = new CountDownLatch(3); + SharedCountListener listener = new SharedCountListener() + { + @Override + public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception + { + setLatch.countDown(); + } + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + // nop + } + }; + count.addListener(listener); + Assert.assertTrue(count.trySetCount(1)); Assert.assertTrue(count.trySetCount(2)); Assert.assertTrue(count.trySetCount(10)); Assert.assertEquals(count.getCount(), 10); + + Assert.assertTrue(new Timing().awaitLatch(setLatch)); } finally { @@ -246,12 +268,30 @@ public class TestSharedCount extends BaseClassForTests Assert.assertTrue(count2.trySetCount(versionedValue, 20)); timing.sleepABit(); + final CountDownLatch setLatch = new CountDownLatch(2); + SharedCountListener listener = new SharedCountListener() + { + @Override + public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception + { + setLatch.countDown(); + } + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + // nop + } + }; + count1.addListener(listener); VersionedValue<Integer> versionedValue1 = count1.getVersionedValue(); VersionedValue<Integer> versionedValue2 = count2.getVersionedValue(); Assert.assertTrue(count2.trySetCount(versionedValue2, 30)); Assert.assertFalse(count1.trySetCount(versionedValue1, 40)); + versionedValue1 = count1.getVersionedValue(); Assert.assertTrue(count1.trySetCount(versionedValue1, 40)); + Assert.assertTrue(timing.awaitLatch(setLatch)); } finally { @@ -368,6 +408,7 @@ public class TestSharedCount extends BaseClassForTests server.restart(); Assert.assertTrue(getReconnectEvent.await(2, TimeUnit.SECONDS)); + Assert.assertEquals(numChangeEvents.get(), 1); sharedCount.trySetCount(sharedCount.getVersionedValue(), 12); @@ -381,7 +422,9 @@ public class TestSharedCount extends BaseClassForTests }).forPath("/count"); flushDone.await(5, TimeUnit.SECONDS); - Assert.assertEquals(2, numChangeEvents.get()); + // CURATOR-311: when a Curator client's state became RECONNECTED, countHasChanged method is called back + // because the Curator client calls readValueAndNotifyListenersInBackground in SharedValue#ConnectionStateListener#stateChanged. + Assert.assertEquals(numChangeEvents.get(), 3); } finally { @@ -389,4 +432,107 @@ public class TestSharedCount extends BaseClassForTests CloseableUtils.closeQuietly(curatorFramework); } } + + @Test + public void testDisconnectReconnectWithMultipleClients() throws Exception + { + CuratorFramework curatorFramework1 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500)); + CuratorFramework curatorFramework2 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500)); + + curatorFramework1.start(); + curatorFramework1.blockUntilConnected(); + curatorFramework2.start(); + curatorFramework2.blockUntilConnected(); + + final String sharedCountPath = "/count"; + final int initialCount = 10; + SharedCount sharedCount1 = new SharedCount(curatorFramework1, sharedCountPath, initialCount); + SharedCount sharedCountWithFaultyWatcher = createSharedCountWithFaultyWatcher(curatorFramework2, sharedCountPath, initialCount); + + class MySharedCountListener implements SharedCountListener + { + final public Phaser gotSuspendEvent = new Phaser(1); + final public Phaser gotChangeEvent = new Phaser(1); + final public Phaser getReconnectEvent = new Phaser(1); + final public AtomicInteger numChangeEvents = new AtomicInteger(0); + + @Override + public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception + { + numChangeEvents.incrementAndGet(); + gotChangeEvent.arrive(); + } + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if (newState == ConnectionState.SUSPENDED) { + gotSuspendEvent.arrive(); + } else if (newState.isConnected()) { + getReconnectEvent.arrive(); + } + } + } + + MySharedCountListener listener1 = new MySharedCountListener(); + sharedCount1.addListener(listener1); + sharedCount1.start(); + MySharedCountListener listener2 = new MySharedCountListener(); + sharedCountWithFaultyWatcher.addListener(listener2); + + try + { + sharedCount1.setCount(12); + Assert.assertEquals(listener1.gotChangeEvent.awaitAdvanceInterruptibly(0, 2, TimeUnit.SECONDS), 1); + Assert.assertEquals(sharedCount1.getCount(), 12); + + Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 10); + // new counter with faultyWatcher start + sharedCountWithFaultyWatcher.start(); + + for (int i = 0; i < 10; i++) { + sharedCount1.setCount(13 + i); + Assert.assertEquals(sharedCount1.getCount(), 13 + i); + + server.restart(); + + Assert.assertEquals(listener2.getReconnectEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1); + // CURATOR-311 introduces to Curator's client reading server's shared count value + // when client's state gets ConnectionState.RECONNECTED. Following tests ensures that. + Assert.assertEquals(listener2.gotChangeEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1); + Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 13 + i); + } + } + finally + { + CloseableUtils.closeQuietly(sharedCount1); + CloseableUtils.closeQuietly(curatorFramework1); + CloseableUtils.closeQuietly(sharedCountWithFaultyWatcher); + CloseableUtils.closeQuietly(curatorFramework2); + } + } + + private SharedCount createSharedCountWithFaultyWatcher(CuratorFramework curatorFramework, String path, int val) { + + final CuratorWatcher faultyWatcher = new CuratorWatcher() { + @Override + public void process(WatchedEvent event) throws Exception { + // everything will be ignored + } + }; + + class FaultySharedValue extends SharedValue { + public FaultySharedValue(CuratorFramework client, String path, byte[] seedValue) { + super(client, path, seedValue, faultyWatcher); + } + }; + + final SharedValue faultySharedValue = new FaultySharedValue(curatorFramework, path, SharedCount.toBytes(val)); + class FaultySharedCount extends SharedCount { + public FaultySharedCount(CuratorFramework client, String path, int val) { + super(client, path, val); + } + }; + return new FaultySharedCount(curatorFramework, path, val); + } }
