Repository: curator
Updated Branches:
  refs/heads/CURATOR-311 [created] 5de6b818a


CURATOR-311 - SharedValue could hold stall data after reconnecting


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/8c1c5ffa
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/8c1c5ffa
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/8c1c5ffa

Branch: refs/heads/CURATOR-311
Commit: 8c1c5ffa287d22eaea18bf6f89a4a8bf6d9b871c
Parents: 35d2cc0
Author: Tsuyoshi Ozawa <[email protected]>
Authored: Wed Jan 11 20:30:46 2017 +0900
Committer: Tsuyoshi Ozawa <[email protected]>
Committed: Fri Feb 24 16:34:51 2017 +0900

----------------------------------------------------------------------
 .../framework/recipes/shared/SharedCount.java   |   5 +
 .../framework/recipes/shared/SharedValue.java   |  27 ++++-
 .../recipes/shared/TestSharedCount.java         | 116 ++++++++++++++++++-
 3 files changed, 146 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/8c1c5ffa/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
index 87fffdd..bdfa844 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
@@ -49,6 +49,11 @@ public class SharedCount implements Closeable, 
SharedCountReader, Listenable<Sha
         sharedValue = new SharedValue(client, path, toBytes(seedValue));
     }
 
+    protected SharedCount(CuratorFramework client, String path, SharedValue sv)
+    {
+        sharedValue = sv;
+    }
+
     @Override
     public int getCount()
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/8c1c5ffa/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index 1f9df37..7e3f26a 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@ -19,6 +19,7 @@
 
 package org.apache.curator.framework.recipes.shared;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
@@ -56,8 +57,9 @@ public class SharedValue implements Closeable, 
SharedValueReader
     private final byte[] seedValue;
     private final AtomicReference<State> state = new 
AtomicReference<State>(State.LATENT);
     private final AtomicReference<VersionedValue<byte[]>> currentValue;
+    private final CuratorWatcher watcher;
 
-    private final CuratorWatcher watcher = new CuratorWatcher()
+    private class SharedValueCuratorWatcher implements CuratorWatcher
     {
         @Override
         public void process(WatchedEvent event) throws Exception
@@ -76,6 +78,17 @@ public class SharedValue implements Closeable, 
SharedValueReader
         public void stateChanged(CuratorFramework client, ConnectionState 
newState)
         {
             notifyListenerOfStateChanged(newState);
+            if ( newState == ConnectionState.RECONNECTED )
+            {
+                try
+                {
+                    readValueAndNotifyListenersInBackground();
+                }
+                catch ( Exception e )
+                {
+                    log.error("Could not read value after reconnect", e);
+                }
+            }
         }
     };
 
@@ -96,6 +109,18 @@ public class SharedValue implements Closeable, 
SharedValueReader
         this.client = client;
         this.path = PathUtils.validatePath(path);
         this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
+        this.watcher = new SharedValueCuratorWatcher();
+        currentValue = new AtomicReference<VersionedValue<byte[]>>(new 
VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, 
seedValue.length)));
+    }
+
+    @VisibleForTesting
+    protected SharedValue(CuratorFramework client, String path, byte[] 
seedValue, CuratorWatcher watcher)
+    {
+        this.client = client;
+        this.path = PathUtils.validatePath(path);
+        this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
+        // inject watcher for testing
+        this.watcher = watcher;
         currentValue = new AtomicReference<VersionedValue<byte[]>>(new 
VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, 
seedValue.length)));
     }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/8c1c5ffa/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index 7939f6e..330c8f4 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@ -25,6 +25,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorWatcher;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryNTimes;
@@ -32,6 +33,7 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.WatchedEvent;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.List;
@@ -42,6 +44,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -368,6 +371,7 @@ public class TestSharedCount extends BaseClassForTests
 
             server.restart();
             Assert.assertTrue(getReconnectEvent.await(2, TimeUnit.SECONDS));
+            Assert.assertEquals(numChangeEvents.get(), 1);
 
             sharedCount.trySetCount(sharedCount.getVersionedValue(), 12);
 
@@ -381,7 +385,9 @@ public class TestSharedCount extends BaseClassForTests
             }).forPath("/count");
             flushDone.await(5, TimeUnit.SECONDS);
 
-            Assert.assertEquals(2, numChangeEvents.get());
+            // CURATOR-311: when a Curator client's state became RECONNECTED, 
countHasChanged method is called back
+            // because the Curator client calls 
readValueAndNotifyListenersInBackground in 
SharedValue#ConnectionStateListener#stateChanged.
+            Assert.assertEquals(numChangeEvents.get(), 3);
         }
         finally
         {
@@ -389,4 +395,112 @@ public class TestSharedCount extends BaseClassForTests
             CloseableUtils.closeQuietly(curatorFramework);
         }
     }
+
+
+    @Test
+    public void testDisconnectReconnectWithMultipleClients() throws Exception
+    {
+        CuratorFramework curatorFramework1 = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryNTimes(10, 500));
+        CuratorFramework curatorFramework2 = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryNTimes(10, 500));
+
+        curatorFramework1.start();
+        curatorFramework1.blockUntilConnected();
+        curatorFramework2.start();
+        curatorFramework2.blockUntilConnected();
+
+        final String sharedCountPath = "/count";
+        final int initialCount = 10;
+        SharedCount sharedCount1 = new SharedCount(curatorFramework1, 
sharedCountPath, initialCount);
+        SharedCount sharedCountWithFaultyWatcher = 
createSharedCountWithFaultyWatcher(curatorFramework2, sharedCountPath, 
initialCount);
+
+        class MySharedCountListener implements SharedCountListener
+        {
+            final public Phaser gotSuspendEvent = new Phaser(1);
+            final public Phaser gotChangeEvent = new Phaser(1);
+            final public Phaser getReconnectEvent = new Phaser(1);
+            final public AtomicInteger numChangeEvents = new AtomicInteger(0);
+
+            @Override
+            public void countHasChanged(SharedCountReader sharedCount, int 
newCount) throws Exception
+            {
+                numChangeEvents.incrementAndGet();
+                gotChangeEvent.arrive();
+            }
+
+            @Override
+            public void stateChanged(CuratorFramework client, ConnectionState 
newState)
+            {
+                if (newState == ConnectionState.SUSPENDED) {
+                    gotSuspendEvent.arrive();
+                } else if (newState == ConnectionState.RECONNECTED) {
+                    getReconnectEvent.arrive();
+                }
+            }
+        }
+
+        MySharedCountListener listener1 = new MySharedCountListener();
+        sharedCount1.addListener(listener1);
+        sharedCount1.start();
+        MySharedCountListener listener2 = new MySharedCountListener();
+        sharedCountWithFaultyWatcher.addListener(listener2);
+
+        try
+        {
+            sharedCount1.setCount(12);
+            
Assert.assertEquals(listener1.gotChangeEvent.awaitAdvanceInterruptibly(0, 2, 
TimeUnit.SECONDS), 1);
+            Assert.assertEquals(sharedCount1.getCount(), 12);
+
+            Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 10);
+            // new counter with faultyWatcher start
+            sharedCountWithFaultyWatcher.start();
+
+            for (int i = 0; i < 10; i++) {
+                sharedCount1.setCount(13 + i);
+                Assert.assertEquals(sharedCount1.getCount(), 13 + i);
+
+                server.restart();
+
+                
Assert.assertEquals(listener2.getReconnectEvent.awaitAdvanceInterruptibly(i, 2, 
TimeUnit.SECONDS), i + 1);
+                // CURATOR-311 introduces to Curator's client reading server's 
shared count value
+                // when client's state gets ConnectionState.RECONNECTED. 
Following tests ensures that.
+                
Assert.assertEquals(listener2.gotChangeEvent.awaitAdvanceInterruptibly(i, 2, 
TimeUnit.SECONDS), i + 1);
+                Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 
13 + i);
+            }
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(sharedCount1);
+            CloseableUtils.closeQuietly(curatorFramework1);
+            CloseableUtils.closeQuietly(sharedCountWithFaultyWatcher);
+            CloseableUtils.closeQuietly(curatorFramework2);
+        }
+    }
+
+    private SharedCount createSharedCountWithFaultyWatcher(CuratorFramework 
curatorFramework, String path, int val) {
+
+        class FaultyCuratorWatcher implements CuratorWatcher {
+            @Override
+            public void process(WatchedEvent event) throws Exception {
+                // everything will be ignored
+            }
+        }
+
+        final FaultyCuratorWatcher fautlyWatcher = new FaultyCuratorWatcher();
+
+        class FaultySharedValue extends SharedValue {
+            public FaultySharedValue(CuratorFramework client, String path, 
byte[] seedValue) {
+                super(client, path, seedValue, fautlyWatcher);
+            }
+        };
+
+        final SharedValue faultySharedValue = new 
FaultySharedValue(curatorFramework, path, SharedCount.toBytes(val));
+        class FaultySharedCount extends SharedCount {
+            public FaultySharedCount(CuratorFramework client, String path, int 
val) {
+                super(client, path, faultySharedValue);
+            }
+        };
+        return new FaultySharedCount(curatorFramework, path, val);
+    }
+
+
 }

Reply via email to