Repository: curator
Updated Branches:
  refs/heads/CURATOR-2.0 c7df8e251 -> 6af5f367e


Make sure readValueAndNotifyListenersInBackground() is called after a 
connection problem


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/3e159bdd
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/3e159bdd
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/3e159bdd

Branch: refs/heads/CURATOR-2.0
Commit: 3e159bddb91c6e8b4e9e27ccbf00e06b4f35638e
Parents: c7df8e2
Author: randgalt <[email protected]>
Authored: Mon Jul 10 11:05:57 2017 -0500
Committer: randgalt <[email protected]>
Committed: Mon Jul 10 11:21:57 2017 -0500

----------------------------------------------------------------------
 .../framework/recipes/shared/SharedValue.java   |  30 +++-
 .../recipes/shared/TestSharedCount.java         | 148 ++++++++++++++++++-
 2 files changed, 175 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/3e159bdd/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index 1f9df37..5478a8f 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@ -19,6 +19,7 @@
 
 package org.apache.curator.framework.recipes.shared;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
@@ -56,8 +57,9 @@ public class SharedValue implements Closeable, 
SharedValueReader
     private final byte[] seedValue;
     private final AtomicReference<State> state = new 
AtomicReference<State>(State.LATENT);
     private final AtomicReference<VersionedValue<byte[]>> currentValue;
+    private final CuratorWatcher watcher;
 
-    private final CuratorWatcher watcher = new CuratorWatcher()
+    private class SharedValueCuratorWatcher implements CuratorWatcher
     {
         @Override
         public void process(WatchedEvent event) throws Exception
@@ -68,7 +70,7 @@ public class SharedValue implements Closeable, 
SharedValueReader
                 readValueAndNotifyListenersInBackground();
             }
         }
-    };
+    }
 
     private final ConnectionStateListener connectionStateListener = new 
ConnectionStateListener()
     {
@@ -76,6 +78,18 @@ public class SharedValue implements Closeable, 
SharedValueReader
         public void stateChanged(CuratorFramework client, ConnectionState 
newState)
         {
             notifyListenerOfStateChanged(newState);
+            if ( newState.isConnected() )
+            {
+                try
+                {
+                    readValueAndNotifyListenersInBackground();
+                }
+                catch ( Exception e )
+                {
+                    ThreadUtils.checkInterrupted(e);
+                    log.error("Could not read value after reconnect", e);
+                }
+            }
         }
     };
 
@@ -96,6 +110,18 @@ public class SharedValue implements Closeable, 
SharedValueReader
         this.client = client;
         this.path = PathUtils.validatePath(path);
         this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
+        this.watcher = new SharedValueCuratorWatcher();
+        currentValue = new AtomicReference<VersionedValue<byte[]>>(new 
VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, 
seedValue.length)));
+    }
+
+    @VisibleForTesting
+    protected SharedValue(CuratorFramework client, String path, byte[] 
seedValue, CuratorWatcher watcher)
+    {
+        this.client = client;
+        this.path = PathUtils.validatePath(path);
+        this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
+        // inject watcher for testing
+        this.watcher = watcher;
         currentValue = new AtomicReference<VersionedValue<byte[]>>(new 
VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, 
seedValue.length)));
     }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/3e159bdd/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index 7939f6e..a6a32e9 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@ -25,6 +25,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorWatcher;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryNTimes;
@@ -32,6 +33,7 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.WatchedEvent;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.List;
@@ -42,6 +44,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -167,10 +170,29 @@ public class TestSharedCount extends BaseClassForTests
             client.start();
             count.start();
 
+            final CountDownLatch setLatch = new CountDownLatch(3);
+            SharedCountListener listener = new SharedCountListener()
+            {
+                @Override
+                public void countHasChanged(SharedCountReader sharedCount, int 
newCount) throws Exception
+                {
+                    setLatch.countDown();
+                }
+
+                @Override
+                public void stateChanged(CuratorFramework client, 
ConnectionState newState)
+                {
+                    // nop
+                }
+            };
+            count.addListener(listener);
+
             Assert.assertTrue(count.trySetCount(1));
             Assert.assertTrue(count.trySetCount(2));
             Assert.assertTrue(count.trySetCount(10));
             Assert.assertEquals(count.getCount(), 10);
+
+            Assert.assertTrue(new Timing().awaitLatch(setLatch));
         }
         finally
         {
@@ -246,12 +268,30 @@ public class TestSharedCount extends BaseClassForTests
             Assert.assertTrue(count2.trySetCount(versionedValue, 20));
             timing.sleepABit();
 
+            final CountDownLatch setLatch = new CountDownLatch(2);
+            SharedCountListener listener = new SharedCountListener()
+            {
+                @Override
+                public void countHasChanged(SharedCountReader sharedCount, int 
newCount) throws Exception
+                {
+                    setLatch.countDown();
+                }
+
+                @Override
+                public void stateChanged(CuratorFramework client, 
ConnectionState newState)
+                {
+                    // nop
+                }
+            };
+            count1.addListener(listener);
             VersionedValue<Integer> versionedValue1 = 
count1.getVersionedValue();
             VersionedValue<Integer> versionedValue2 = 
count2.getVersionedValue();
             Assert.assertTrue(count2.trySetCount(versionedValue2, 30));
             Assert.assertFalse(count1.trySetCount(versionedValue1, 40));
+
             versionedValue1 = count1.getVersionedValue();
             Assert.assertTrue(count1.trySetCount(versionedValue1, 40));
+            Assert.assertTrue(timing.awaitLatch(setLatch));
         }
         finally
         {
@@ -368,6 +408,7 @@ public class TestSharedCount extends BaseClassForTests
 
             server.restart();
             Assert.assertTrue(getReconnectEvent.await(2, TimeUnit.SECONDS));
+            Assert.assertEquals(numChangeEvents.get(), 1);
 
             sharedCount.trySetCount(sharedCount.getVersionedValue(), 12);
 
@@ -381,7 +422,9 @@ public class TestSharedCount extends BaseClassForTests
             }).forPath("/count");
             flushDone.await(5, TimeUnit.SECONDS);
 
-            Assert.assertEquals(2, numChangeEvents.get());
+            // CURATOR-311: when a Curator client's state became RECONNECTED, 
countHasChanged method is called back
+            // because the Curator client calls 
readValueAndNotifyListenersInBackground in 
SharedValue#ConnectionStateListener#stateChanged.
+            Assert.assertEquals(numChangeEvents.get(), 3);
         }
         finally
         {
@@ -389,4 +432,107 @@ public class TestSharedCount extends BaseClassForTests
             CloseableUtils.closeQuietly(curatorFramework);
         }
     }
+
+    @Test
+    public void testDisconnectReconnectWithMultipleClients() throws Exception
+    {
+        CuratorFramework curatorFramework1 = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryNTimes(10, 500));
+        CuratorFramework curatorFramework2 = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryNTimes(10, 500));
+
+        curatorFramework1.start();
+        curatorFramework1.blockUntilConnected();
+        curatorFramework2.start();
+        curatorFramework2.blockUntilConnected();
+
+        final String sharedCountPath = "/count";
+        final int initialCount = 10;
+        SharedCount sharedCount1 = new SharedCount(curatorFramework1, 
sharedCountPath, initialCount);
+        SharedCount sharedCountWithFaultyWatcher = 
createSharedCountWithFaultyWatcher(curatorFramework2, sharedCountPath, 
initialCount);
+
+        class MySharedCountListener implements SharedCountListener
+        {
+            final public Phaser gotSuspendEvent = new Phaser(1);
+            final public Phaser gotChangeEvent = new Phaser(1);
+            final public Phaser getReconnectEvent = new Phaser(1);
+            final public AtomicInteger numChangeEvents = new AtomicInteger(0);
+
+            @Override
+            public void countHasChanged(SharedCountReader sharedCount, int 
newCount) throws Exception
+            {
+                numChangeEvents.incrementAndGet();
+                gotChangeEvent.arrive();
+            }
+
+            @Override
+            public void stateChanged(CuratorFramework client, ConnectionState 
newState)
+            {
+                if (newState == ConnectionState.SUSPENDED) {
+                    gotSuspendEvent.arrive();
+                } else if (newState.isConnected()) {
+                    getReconnectEvent.arrive();
+                }
+            }
+        }
+
+        MySharedCountListener listener1 = new MySharedCountListener();
+        sharedCount1.addListener(listener1);
+        sharedCount1.start();
+        MySharedCountListener listener2 = new MySharedCountListener();
+        sharedCountWithFaultyWatcher.addListener(listener2);
+
+        try
+        {
+            sharedCount1.setCount(12);
+            
Assert.assertEquals(listener1.gotChangeEvent.awaitAdvanceInterruptibly(0, 2, 
TimeUnit.SECONDS), 1);
+            Assert.assertEquals(sharedCount1.getCount(), 12);
+
+            Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 10);
+            // new counter with faultyWatcher start
+            sharedCountWithFaultyWatcher.start();
+
+            for (int i = 0; i < 10; i++) {
+                sharedCount1.setCount(13 + i);
+                Assert.assertEquals(sharedCount1.getCount(), 13 + i);
+
+                server.restart();
+
+                
Assert.assertEquals(listener2.getReconnectEvent.awaitAdvanceInterruptibly(i, 2, 
TimeUnit.SECONDS), i + 1);
+                // CURATOR-311 introduces to Curator's client reading server's 
shared count value
+                // when client's state gets ConnectionState.RECONNECTED. 
Following tests ensures that.
+                
Assert.assertEquals(listener2.gotChangeEvent.awaitAdvanceInterruptibly(i, 2, 
TimeUnit.SECONDS), i + 1);
+                Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 
13 + i);
+            }
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(sharedCount1);
+            CloseableUtils.closeQuietly(curatorFramework1);
+            CloseableUtils.closeQuietly(sharedCountWithFaultyWatcher);
+            CloseableUtils.closeQuietly(curatorFramework2);
+        }
+    }
+
+    private SharedCount createSharedCountWithFaultyWatcher(CuratorFramework 
curatorFramework, String path, int val) {
+
+        final CuratorWatcher faultyWatcher = new CuratorWatcher() {
+            @Override
+            public void process(WatchedEvent event) throws Exception {
+                // everything will be ignored
+            }
+        };
+
+        class FaultySharedValue extends SharedValue {
+            public FaultySharedValue(CuratorFramework client, String path, 
byte[] seedValue) {
+                super(client, path, seedValue, faultyWatcher);
+            }
+        };
+
+        final SharedValue faultySharedValue = new 
FaultySharedValue(curatorFramework, path, SharedCount.toBytes(val));
+        class FaultySharedCount extends SharedCount {
+            public FaultySharedCount(CuratorFramework client, String path, int 
val) {
+                super(client, path, val);
+            }
+        };
+        return new FaultySharedCount(curatorFramework, path, val);
+    }
 }

Reply via email to