Repository: curator Updated Branches: refs/heads/CURATOR-344 [created] 022de3921
CURATOR-344 - do watch event processing tasks in the background and limit shared value watcher to valid change events to avoid work on disconnect Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/022de392 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/022de392 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/022de392 Branch: refs/heads/CURATOR-344 Commit: 022de3921a120c6f86cc6e21442327cc04b66cd2 Parents: ef33ccb Author: gtully <gary.tu...@gmail.com> Authored: Thu Aug 18 19:34:10 2016 +0100 Committer: gtully <gary.tu...@gmail.com> Committed: Tue Aug 30 13:09:56 2016 +0100 ---------------------------------------------------------------------- .../framework/recipes/shared/SharedValue.java | 24 ++++- .../recipes/shared/TestSharedCount.java | 106 +++++++++++++++++++ 2 files changed, 127 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/022de392/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java index dddc471..1f9df37 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java @@ -22,6 +22,8 @@ package org.apache.curator.framework.recipes.shared; import com.google.common.base.Function; import com.google.common.base.Preconditions; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.framework.listen.ListenerContainer; import org.apache.curator.framework.state.ConnectionState; @@ -30,6 +32,7 @@ import org.apache.curator.utils.PathUtils; import org.apache.curator.utils.ThreadUtils; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,10 +62,10 @@ public class SharedValue implements Closeable, SharedValueReader @Override public void process(WatchedEvent event) throws Exception { - if ( state.get() == State.STARTED ) + if ( state.get() == State.STARTED && event.getType() != Watcher.Event.EventType.None ) { - readValue(); - notifyListeners(); + // don't block event thread in possible retry + readValueAndNotifyListenersInBackground(); } } }; @@ -248,6 +251,21 @@ public class SharedValue implements Closeable, SharedValueReader updateValue(localStat.getVersion(), bytes); } + private final BackgroundCallback upadateAndNotifyListenerCallback = new BackgroundCallback() { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { + if (event.getResultCode() == KeeperException.Code.OK.intValue()) { + updateValue(event.getStat().getVersion(), event.getData()); + notifyListeners(); + } + } + }; + + private void readValueAndNotifyListenersInBackground() throws Exception + { + client.getData().usingWatcher(watcher).inBackground(upadateAndNotifyListenerCallback).forPath(path); + } + private void notifyListeners() { final byte[] localValue = getValue(); http://git-wip-us.apache.org/repos/asf/curator/blob/022de392/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java index a1f4d8c..7939f6e 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java @@ -23,7 +23,11 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.retry.RetryNTimes; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.Timing; @@ -40,6 +44,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; public class TestSharedCount extends BaseClassForTests { @@ -283,4 +288,105 @@ public class TestSharedCount extends BaseClassForTests CloseableUtils.closeQuietly(client1); } } + + + @Test + public void testDisconnectEventOnWatcherDoesNotRetry() throws Exception + { + final CountDownLatch gotSuspendEvent = new CountDownLatch(1); + + CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 1000)); + curatorFramework.start(); + curatorFramework.blockUntilConnected(); + + SharedCount sharedCount = new SharedCount(curatorFramework, "/count", 10); + sharedCount.start(); + + curatorFramework.getConnectionStateListenable().addListener(new ConnectionStateListener() { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) { + if (newState == ConnectionState.SUSPENDED) { + gotSuspendEvent.countDown(); + } + } + }); + + try + { + server.stop(); + // if watcher goes into 10second retry loop we won't get timely notification + Assert.assertTrue(gotSuspendEvent.await(5, TimeUnit.SECONDS)); + } + finally + { + CloseableUtils.closeQuietly(sharedCount); + CloseableUtils.closeQuietly(curatorFramework); + } + } + + @Test + public void testDisconnectReconnectEventDoesNotFireValueWatcher() throws Exception + { + final CountDownLatch gotSuspendEvent = new CountDownLatch(1); + final CountDownLatch gotChangeEvent = new CountDownLatch(1); + final CountDownLatch getReconnectEvent = new CountDownLatch(1); + + final AtomicInteger numChangeEvents = new AtomicInteger(0); + + + CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500)); + curatorFramework.start(); + curatorFramework.blockUntilConnected(); + + SharedCount sharedCount = new SharedCount(curatorFramework, "/count", 10); + + sharedCount.addListener(new SharedCountListener() { + @Override + public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception { + numChangeEvents.incrementAndGet(); + gotChangeEvent.countDown(); + } + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) { + if (newState == ConnectionState.SUSPENDED) { + gotSuspendEvent.countDown(); + } else if (newState == ConnectionState.RECONNECTED) { + getReconnectEvent.countDown(); + } + } + }); + sharedCount.start(); + + try + { + sharedCount.setCount(11); + Assert.assertTrue(gotChangeEvent.await(2, TimeUnit.SECONDS)); + + server.stop(); + Assert.assertTrue(gotSuspendEvent.await(2, TimeUnit.SECONDS)); + + server.restart(); + Assert.assertTrue(getReconnectEvent.await(2, TimeUnit.SECONDS)); + + sharedCount.trySetCount(sharedCount.getVersionedValue(), 12); + + // flush background task queue + final CountDownLatch flushDone = new CountDownLatch(1); + curatorFramework.getData().inBackground(new BackgroundCallback() { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { + flushDone.countDown(); + } + }).forPath("/count"); + flushDone.await(5, TimeUnit.SECONDS); + + Assert.assertEquals(2, numChangeEvents.get()); + } + finally + { + CloseableUtils.closeQuietly(sharedCount); + CloseableUtils.closeQuietly(curatorFramework); + } + } }