Repository: curator
Updated Branches:
  refs/heads/master f57206d6b -> eb6ad402b


When the connection is lost client.getZookeeperClient().getZooKeeper() needs to 
be called periodically so that the ensemble provider may update the connection 
string, etc.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/7bab002f
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/7bab002f
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/7bab002f

Branch: refs/heads/master
Commit: 7bab002fe2f124bc426ea4c7e29103b39ea13d9d
Parents: f57206d
Author: randgalt <randg...@apache.org>
Authored: Tue Feb 20 13:36:22 2018 -0500
Committer: randgalt <randg...@apache.org>
Committed: Tue Feb 20 13:57:50 2018 -0500

----------------------------------------------------------------------
 .../curator/ensemble/TestEnsembleProvider.java  | 106 ++++++++++++
 .../framework/state/ConnectionStateManager.java |  12 ++
 .../ensemble/TestEnsembleProvider.java          | 162 +++++++++++++++++++
 .../apache/curator/test/BaseClassForTests.java  |   2 +-
 4 files changed, 281 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/7bab002f/curator-client/src/test/java/org/apache/curator/ensemble/TestEnsembleProvider.java
----------------------------------------------------------------------
diff --git 
a/curator-client/src/test/java/org/apache/curator/ensemble/TestEnsembleProvider.java
 
b/curator-client/src/test/java/org/apache/curator/ensemble/TestEnsembleProvider.java
new file mode 100644
index 0000000..36f0fd7
--- /dev/null
+++ 
b/curator-client/src/test/java/org/apache/curator/ensemble/TestEnsembleProvider.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.ensemble;
+
+import org.apache.curator.CuratorZookeeperClient;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.Semaphore;
+
+public class TestEnsembleProvider extends BaseClassForTests
+{
+    private final Timing timing = new Timing();
+
+    @Test
+    public void testBasic() throws Exception
+    {
+        Semaphore counter = new Semaphore(0);
+        final CuratorZookeeperClient client = new CuratorZookeeperClient(new 
CountingEnsembleProvider(counter), timing.session(), timing.connection(), null, 
new RetryOneTime(2));
+        try
+        {
+            client.start();
+            Assert.assertTrue(timing.acquireSemaphore(counter));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testAfterSessionExpiration() throws Exception
+    {
+        Semaphore counter = new Semaphore(0);
+        final CuratorZookeeperClient client = new CuratorZookeeperClient(new 
CountingEnsembleProvider(counter), timing.session(), timing.connection(), null, 
new RetryOneTime(2));
+        try
+        {
+            client.start();
+            Assert.assertTrue(timing.acquireSemaphore(counter));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    private class CountingEnsembleProvider implements EnsembleProvider
+    {
+        private final Semaphore getConnectionStringCounter;
+
+        public CountingEnsembleProvider(Semaphore getConnectionStringCounter)
+        {
+            this.getConnectionStringCounter = getConnectionStringCounter;
+        }
+
+        @Override
+        public void start()
+        {
+            // NOP
+        }
+
+        @Override
+        public String getConnectionString()
+        {
+            getConnectionStringCounter.release();
+            return server.getConnectString();
+        }
+
+        @Override
+        public void close()
+        {
+            // NOP
+        }
+
+        @Override
+        public void setConnectionString(String connectionString)
+        {
+            // NOP
+        }
+
+        @Override
+        public boolean updateServerListEnabled()
+        {
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/7bab002f/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 251baa9..0c8ddf8 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -315,6 +315,18 @@ public class ConnectionStateManager implements Closeable
                 }
             }
         }
+        else if ( currentConnectionState == ConnectionState.LOST )
+        {
+            try
+            {
+                // give ConnectionState.checkTimeouts() a chance to run, reset 
ensemble providers, etc.
+                client.getZookeeperClient().getZooKeeper();
+            }
+            catch ( Exception e )
+            {
+                log.error("Could not get ZooKeeper", e);
+            }
+        }
     }
 
     private void setCurrentConnectionState(ConnectionState newConnectionState)

http://git-wip-us.apache.org/repos/asf/curator/blob/7bab002f/curator-framework/src/test/java/org/apache/curator/framework/ensemble/TestEnsembleProvider.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/test/java/org/apache/curator/framework/ensemble/TestEnsembleProvider.java
 
b/curator-framework/src/test/java/org/apache/curator/framework/ensemble/TestEnsembleProvider.java
new file mode 100644
index 0000000..73d94a5
--- /dev/null
+++ 
b/curator-framework/src/test/java/org/apache/curator/framework/ensemble/TestEnsembleProvider.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.ensemble;
+
+import org.apache.curator.ensemble.EnsembleProvider;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+public class TestEnsembleProvider extends BaseClassForTests
+{
+    private final Timing timing = new Timing();
+
+    @Test
+    public void testBasic()
+    {
+        Semaphore counter = new Semaphore(0);
+        final CuratorFramework client = newClient(counter);
+        try
+        {
+            client.start();
+            Assert.assertTrue(timing.acquireSemaphore(counter));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testAfterSessionExpiration() throws Exception
+    {
+        TestingServer oldServer = server;
+        Semaphore counter = new Semaphore(0);
+        final CuratorFramework client = newClient(counter);
+        try
+        {
+            client.start();
+
+            final CountDownLatch connectedLatch = new CountDownLatch(1);
+            final CountDownLatch lostLatch = new CountDownLatch(1);
+            final CountDownLatch reconnectedLatch = new CountDownLatch(1);
+            ConnectionStateListener listener = new ConnectionStateListener()
+            {
+                @Override
+                public void stateChanged(CuratorFramework client, 
ConnectionState newState)
+                {
+                    if ( newState == ConnectionState.CONNECTED )
+                    {
+                        connectedLatch.countDown();
+                    }
+                    if ( newState == ConnectionState.LOST )
+                    {
+                        lostLatch.countDown();
+                    }
+                    if ( newState == ConnectionState.RECONNECTED )
+                    {
+                        reconnectedLatch.countDown();
+                    }
+                }
+            };
+            client.getConnectionStateListenable().addListener(listener);
+            Assert.assertTrue(timing.awaitLatch(connectedLatch));
+
+            server.stop();
+
+            Assert.assertTrue(timing.awaitLatch(lostLatch));
+            counter.drainPermits();
+            for ( int i = 0; i < 5; ++i )
+            {
+                // the ensemble provider should still be called periodically 
when the connection is lost
+                Assert.assertTrue(timing.acquireSemaphore(counter), "Failed 
when i is: " + i);
+            }
+
+            server = new TestingServer();   // this changes the 
CountingEnsembleProvider's value for getConnectionString() - connection should 
notice this and recover
+            Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+            CloseableUtils.closeQuietly(oldServer);
+        }
+    }
+
+    private CuratorFramework newClient(Semaphore counter)
+    {
+        return CuratorFrameworkFactory.builder()
+            .ensembleProvider(new CountingEnsembleProvider(counter))
+            .sessionTimeoutMs(timing.session())
+            .connectionTimeoutMs(timing.connection())
+            .retryPolicy(new RetryOneTime(1))
+            .build();
+    }
+
+    private class CountingEnsembleProvider implements EnsembleProvider
+    {
+        private final Semaphore getConnectionStringCounter;
+
+        public CountingEnsembleProvider(Semaphore getConnectionStringCounter)
+        {
+            this.getConnectionStringCounter = getConnectionStringCounter;
+        }
+
+        @Override
+        public void start()
+        {
+            // NOP
+        }
+
+        @Override
+        public String getConnectionString()
+        {
+            getConnectionStringCounter.release();
+            return server.getConnectString();
+        }
+
+        @Override
+        public void close()
+        {
+            // NOP
+        }
+
+        @Override
+        public void setConnectionString(String connectionString)
+        {
+            // NOP
+        }
+
+        @Override
+        public boolean updateServerListEnabled()
+        {
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/7bab002f/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
----------------------------------------------------------------------
diff --git 
a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java 
b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
index 52446df..7c4af65 100644
--- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
+++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
@@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 public class BaseClassForTests
 {
-    protected TestingServer server;
+    protected volatile TestingServer server;
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private static final String INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES;

Reply via email to