This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-505
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 96f186abc3e4977eaac580054bd8a3cdfa79d22a
Author: randgalt <[email protected]>
AuthorDate: Thu Feb 7 12:10:52 2019 -0500

    CURATOR-505 - interim checking - refactoring, simplifications, more 
testing, and documentation
---
 .../apache/curator/framework/CuratorFramework.java | 10 +++
 .../curator/framework/CuratorFrameworkFactory.java |  2 +-
 .../framework/imps/CuratorFrameworkImpl.java       | 13 +++-
 .../curator/framework/state/CircuitBreaker.java    | 24 +++---
 .../CircuitBreakingConnectionStateListener.java    | 11 ++-
 .../state/ConnectionStateListenerDecorator.java    |  6 +-
 .../framework/state/TestCircuitBreaker.java        | 42 ++++++++---
 ...TestCircuitBreakingConnectionStateListener.java | 86 ++++++++++++++++++++--
 .../framework/recipes/leader/TestLeaderLatch.java  | 28 +++----
 src/site/confluence/errors.confluence              |  3 +-
 src/site/confluence/utilities.confluence           | 44 +++++++++++
 11 files changed, 214 insertions(+), 55 deletions(-)

diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 6513716..2657781 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -30,6 +30,7 @@ import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.schema.SchemaSet;
 import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.framework.state.ConnectionStateListenerDecorator;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
@@ -367,4 +368,13 @@ public interface CuratorFramework extends Closeable
      * @return decorated listener
      */
     ConnectionStateListener 
decorateConnectionStateListener(ConnectionStateListener actual);
+
+    /**
+     * Returns a facade of the current instance that uses the given connection 
state listener
+     * decorator instead of the configured one
+     *
+     * @param newDecorator decorator to use
+     * @return facade
+     */
+    CuratorFramework 
usingConnectionStateListenerDecorator(ConnectionStateListenerDecorator 
newDecorator);
 }
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index 0cc6e0d..283a093 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -508,7 +508,7 @@ public class CuratorFrameworkFactory
          * @return this
          * @since 4.2.0
          */
-        public Builder 
connectionStateListenerFactory(ConnectionStateListenerDecorator 
connectionStateListenerDecorator)
+        public Builder 
connectionStateListenerDecorator(ConnectionStateListenerDecorator 
connectionStateListenerDecorator)
         {
             this.connectionStateListenerDecorator = 
Objects.requireNonNull(connectionStateListenerDecorator, 
"connectionStateListenerFactory cannot be null");
             return this;
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 81cae74..f210021 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -236,6 +236,11 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
 
     protected CuratorFrameworkImpl(CuratorFrameworkImpl parent)
     {
+        this(parent, parent.connectionStateListenerDecorator);
+    }
+
+    private CuratorFrameworkImpl(CuratorFrameworkImpl parent, 
ConnectionStateListenerDecorator connectionStateListenerDecorator)
+    {
         client = parent.client;
         listeners = parent.listeners;
         unhandledErrorListeners = parent.unhandledErrorListeners;
@@ -260,7 +265,7 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
         zk34CompatibilityMode = parent.zk34CompatibilityMode;
         ensembleTracker = null;
         runSafeService = parent.runSafeService;
-        connectionStateListenerDecorator = 
parent.connectionStateListenerDecorator;
+        this.connectionStateListenerDecorator = 
connectionStateListenerDecorator;
     }
 
     @Override
@@ -599,6 +604,12 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
         return connectionStateListenerDecorator.decorateListener(this, actual);
     }
 
+    @Override
+    public CuratorFramework 
usingConnectionStateListenerDecorator(ConnectionStateListenerDecorator 
newDecorator)
+    {
+        return new CuratorFrameworkImpl(this, newDecorator);
+    }
+
     ACLProvider getAclProvider()
     {
         return aclProvider;
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java
index ad48a15..504edbc 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java
@@ -14,7 +14,7 @@ class CircuitBreaker
 
     private boolean isOpen = false;
     private int retryCount = 0;
-    private long openStartNanos = 0;
+    private long startNanos = 0;
 
     CircuitBreaker(RetryPolicy retryPolicy, ScheduledExecutorService service)
     {
@@ -41,13 +41,13 @@ class CircuitBreaker
 
         isOpen = true;
         retryCount = 0;
-        openStartNanos = System.nanoTime();
-        if ( !tryToRetry(completion) )
+        startNanos = System.nanoTime();
+        if ( tryToRetry(completion) )
         {
-            close();
-            return false;
+            return true;
         }
-        return true;
+        close();
+        return false;
     }
 
     boolean tryToRetry(Runnable completion)
@@ -59,13 +59,13 @@ class CircuitBreaker
 
         long[] sleepTimeNanos = new long[]{0L};
         RetrySleeper retrySleeper = (time, unit) -> sleepTimeNanos[0] = 
unit.toNanos(time);
-        if ( !retryPolicy.allowRetry(retryCount, System.nanoTime() - 
openStartNanos, retrySleeper) )
+        if ( retryPolicy.allowRetry(retryCount, System.nanoTime() - 
startNanos, retrySleeper) )
         {
-            return false;
+            ++retryCount;
+            service.schedule(completion, sleepTimeNanos[0], 
TimeUnit.NANOSECONDS);
+            return true;
         }
-        ++retryCount;
-        service.schedule(completion, sleepTimeNanos[0], TimeUnit.NANOSECONDS);
-        return true;
+        return false;
     }
 
     boolean close()
@@ -73,7 +73,7 @@ class CircuitBreaker
         boolean wasOpen = isOpen;
         retryCount = 0;
         isOpen = false;
-        openStartNanos = 0;
+        startNanos = 0;
         return wasOpen;
     }
 }
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java
index 12efad9..dba651a 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java
@@ -13,7 +13,7 @@ import java.util.concurrent.ScheduledExecutorService;
  *     A decorator/proxy for connection state listeners that adds circuit 
breaking behavior. During network
  *     outages ZooKeeper can become very noisy sending 
connection/disconnection events in rapid succession.
  *     Curator recipes respond to these messages by resetting state, etc. E.g. 
LeaderLatch must delete
- *     its lock node and try to recreated it in order to try to re-obtain 
leadership, etc.
+ *     its lock node and try to recreate it in order to try to re-obtain 
leadership, etc.
  * </p>
  *
  * <p>
@@ -114,7 +114,7 @@ public class CircuitBreakingConnectionStateListener 
implements ConnectionStateLi
                 log.debug("Could not open circuit breaker. State: {}", 
newState);
             }
         }
-        callListener(circuitInitialState);
+        callListener(newState);
     }
 
     private synchronized void handleOpenStateChange(ConnectionState newState)
@@ -126,11 +126,10 @@ public class CircuitBreakingConnectionStateListener 
implements ConnectionStateLi
         }
         else
         {
-            circuitLostHasBeenSent = true;
-            circuitInitialState = ConnectionState.LOST;
-            circuitLastState = newState;
             log.debug("Circuit is open. State changed to LOST. Sending to 
listener.");
-            callListener(circuitInitialState);
+            circuitLostHasBeenSent = true;
+            circuitLastState = circuitInitialState = ConnectionState.LOST;
+            callListener(ConnectionState.LOST);
         }
     }
 
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java
index 0f11c46..0ac808b 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java
@@ -17,14 +17,14 @@ import java.util.concurrent.ScheduledExecutorService;
  * <code><pre>
  * CuratorFramework client ...
  * ConnectionStateListener listener = ...
- * ConnectionStateListener wrappedListener = 
client.wrapConnectionStateListener(listener);
+ * ConnectionStateListener decoratedListener = 
client.decorateConnectionStateListener(listener);
  *
  * ...
  *
- * client.getConnectionStateListenable().addListener(wrappedListener);
+ * client.getConnectionStateListenable().addListener(decoratedListener);
  *
  * // later, to remove...
- * client.getConnectionStateListenable().removeListener(wrappedListener);
+ * client.getConnectionStateListenable().removeListener(decoratedListener);
  * </pre></code>
  * </p>
  */
diff --git 
a/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java
 
b/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java
index 77ec20f..e2daa96 100644
--- 
a/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java
+++ 
b/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java
@@ -1,7 +1,9 @@
 package org.apache.curator.framework.state;
 
+import org.apache.curator.retry.RetryForever;
 import org.apache.curator.retry.RetryNTimes;
 import org.testng.Assert;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.Test;
 import java.time.Duration;
 import java.time.temporal.ChronoUnit;
@@ -12,23 +14,30 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 public class TestCircuitBreaker
 {
+    private Duration[] lastDelay = new Duration[]{Duration.ZERO};
+    private ScheduledThreadPoolExecutor service = new 
ScheduledThreadPoolExecutor(1)
+    {
+        @Override
+        public ScheduledFuture<?> schedule(Runnable command, long delay, 
TimeUnit unit)
+        {
+            lastDelay[0] = Duration.of(unit.toNanos(delay), ChronoUnit.NANOS);
+            command.run();
+            return null;
+        }
+    };
+
+    @AfterClass
+    public void tearDown()
+    {
+        service.shutdownNow();
+    }
+
     @Test
     public void testBasic()
     {
         final int retryQty = 1;
         final Duration delay = Duration.ofSeconds(10);
 
-        Duration[] lastDelay = new Duration[]{Duration.ZERO};
-        ScheduledThreadPoolExecutor service = new 
ScheduledThreadPoolExecutor(1)
-        {
-            @Override
-            public ScheduledFuture<?> schedule(Runnable command, long delay, 
TimeUnit unit)
-            {
-                lastDelay[0] = Duration.of(unit.toNanos(delay), 
ChronoUnit.NANOS);
-                command.run();
-                return null;
-            }
-        };
         CircuitBreaker circuitBreaker = new CircuitBreaker(new 
RetryNTimes(retryQty, (int)delay.toMillis()), service);
         AtomicInteger counter = new AtomicInteger(0);
 
@@ -46,4 +55,15 @@ public class TestCircuitBreaker
         Assert.assertEquals(circuitBreaker.getRetryCount(), 0);
         Assert.assertFalse(circuitBreaker.close());
     }
+
+    @Test
+    public void testVariousOpenRetryFails()
+    {
+        CircuitBreaker circuitBreaker = new CircuitBreaker(new 
RetryForever(1), service);
+        Assert.assertFalse(circuitBreaker.tryToRetry(() -> {}));
+        Assert.assertTrue(circuitBreaker.tryToOpen(() -> {}));
+        Assert.assertFalse(circuitBreaker.tryToOpen(() -> {}));
+        Assert.assertTrue(circuitBreaker.close());
+        Assert.assertFalse(circuitBreaker.close());
+    }
 }
diff --git 
a/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreakingConnectionStateListener.java
 
b/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreakingConnectionStateListener.java
index 36a1954..1712eed 100644
--- 
a/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreakingConnectionStateListener.java
+++ 
b/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreakingConnectionStateListener.java
@@ -8,7 +8,8 @@ import org.apache.curator.retry.RetryForever;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.compatibility.Timing2;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -20,7 +21,7 @@ public class TestCircuitBreakingConnectionStateListener
     private final CuratorFramework dummyClient = 
CuratorFrameworkFactory.newClient("foo", new RetryOneTime(1));
     private final Timing2 timing = new Timing2();
     private final Timing2 retryTiming = timing.multiple(.25);
-    private final ScheduledThreadPoolExecutor service = new 
ScheduledThreadPoolExecutor(1);
+    private volatile ScheduledThreadPoolExecutor service;
 
     private static class RecordingListener implements ConnectionStateListener
     {
@@ -49,7 +50,13 @@ public class TestCircuitBreakingConnectionStateListener
         }
     }
 
-    @AfterClass
+    @BeforeMethod
+    public void setup()
+    {
+        service = new ScheduledThreadPoolExecutor(1);
+    }
+
+    @AfterMethod
     public void tearDown()
     {
         service.shutdownNow();
@@ -60,7 +67,7 @@ public class TestCircuitBreakingConnectionStateListener
     {
         RecordingListener recordingListener = new RecordingListener();
         TestRetryPolicy retryPolicy = new TestRetryPolicy();
-        final CircuitBreakingConnectionStateListener listener = new 
CircuitBreakingConnectionStateListener(dummyClient, recordingListener, 
retryPolicy, service);
+        CircuitBreakingConnectionStateListener listener = new 
CircuitBreakingConnectionStateListener(dummyClient, recordingListener, 
retryPolicy, service);
 
         listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
         
Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), 
ConnectionState.RECONNECTED);
@@ -94,8 +101,11 @@ public class TestCircuitBreakingConnectionStateListener
         TestRetryPolicy retryPolicy = new TestRetryPolicy();
         CircuitBreakingConnectionStateListener listener = new 
CircuitBreakingConnectionStateListener(dummyClient, recordingListener, 
retryPolicy, service);
 
-        listener.stateChanged(dummyClient, ConnectionState.LOST);
-        listener.stateChanged(dummyClient, ConnectionState.LOST);   // second 
LOST ignored
+        synchronized(listener)  // don't let retry policy run while we're 
pushing state changes
+        {
+            listener.stateChanged(dummyClient, ConnectionState.LOST);
+            listener.stateChanged(dummyClient, ConnectionState.LOST);   // 
second LOST ignored
+        }
         
Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), 
ConnectionState.LOST);
         Assert.assertTrue(recordingListener.stateChanges.isEmpty());
 
@@ -112,6 +122,7 @@ public class TestCircuitBreakingConnectionStateListener
 
         listener.stateChanged(dummyClient, ConnectionState.LOST);
         
Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), 
ConnectionState.LOST);
+        Assert.assertFalse(listener.isOpen());
         listener.stateChanged(dummyClient, ConnectionState.LOST);
         
Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), 
ConnectionState.LOST);
         Assert.assertFalse(listener.isOpen());
@@ -122,7 +133,7 @@ public class TestCircuitBreakingConnectionStateListener
     {
         RecordingListener recordingListener = new RecordingListener();
         RetryPolicy retryOnce = new RetryOneTime(retryTiming.milliseconds());
-        final CircuitBreakingConnectionStateListener listener = new 
CircuitBreakingConnectionStateListener(dummyClient, recordingListener, 
retryOnce, service);
+        CircuitBreakingConnectionStateListener listener = new 
CircuitBreakingConnectionStateListener(dummyClient, recordingListener, 
retryOnce, service);
 
         synchronized(listener)  // don't let retry policy run while we're 
pushing state changes
         {
@@ -134,4 +145,65 @@ public class TestCircuitBreakingConnectionStateListener
         
Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), 
ConnectionState.SUSPENDED);
         Assert.assertFalse(listener.isOpen());
     }
+
+    @Test
+    public void testSuspendedToLostRatcheting() throws Exception
+    {
+        RecordingListener recordingListener = new RecordingListener();
+        RetryPolicy retryInfinite = new RetryForever(Integer.MAX_VALUE);
+        CircuitBreakingConnectionStateListener listener = new 
CircuitBreakingConnectionStateListener(dummyClient, recordingListener, 
retryInfinite, service);
+
+        listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+        Assert.assertFalse(listener.isOpen());
+        
Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), 
ConnectionState.RECONNECTED);
+
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        Assert.assertTrue(listener.isOpen());
+        
Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), 
ConnectionState.SUSPENDED);
+
+        listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+        listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+        listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        Assert.assertTrue(recordingListener.stateChanges.isEmpty());
+        Assert.assertTrue(listener.isOpen());
+
+        listener.stateChanged(dummyClient, ConnectionState.LOST);
+        
Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), 
ConnectionState.LOST);
+        Assert.assertTrue(listener.isOpen());
+
+        listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+        listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+        listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        Assert.assertTrue(recordingListener.stateChanges.isEmpty());
+        Assert.assertTrue(listener.isOpen());
+
+        listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+        listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.LOST);
+        listener.stateChanged(dummyClient, ConnectionState.LOST);
+        listener.stateChanged(dummyClient, ConnectionState.LOST);
+        listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+        listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
+        listener.stateChanged(dummyClient, ConnectionState.LOST);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.LOST);
+        Assert.assertTrue(recordingListener.stateChanges.isEmpty());
+        Assert.assertTrue(listener.isOpen());
+    }
 }
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index 439f6c8..bf2abd4 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -64,18 +64,18 @@ public class TestLeaderLatch extends BaseClassForTests
     public void testWithCircuitBreaker() throws Exception
     {
         Timing2 timing = new Timing2();
-        ConnectionStateListenerDecorator factory = 
ConnectionStateListenerDecorator.circuitBreaking(new 
RetryForever(timing.milliseconds()));
+        ConnectionStateListenerDecorator decorator = 
ConnectionStateListenerDecorator.circuitBreaking(new 
RetryForever(timing.multiple(2).milliseconds()));
         try ( CuratorFramework client = CuratorFrameworkFactory.builder()
             .connectString(server.getConnectString())
             .retryPolicy(new RetryOneTime(1))
-            .connectionStateListenerFactory(factory)
+            .connectionStateListenerDecorator(decorator)
             .connectionTimeoutMs(timing.connection())
             .sessionTimeoutMs(timing.session())
             .build() )
         {
             client.start();
             AtomicInteger resetCount = new AtomicInteger(0);
-            LeaderLatch latch = new LeaderLatch(client, "/foo/bar")
+            try ( LeaderLatch latch = new LeaderLatch(client, "/foo/bar")
             {
                 @Override
                 void reset() throws Exception
@@ -83,18 +83,20 @@ public class TestLeaderLatch extends BaseClassForTests
                     resetCount.incrementAndGet();
                     super.reset();
                 }
-            };
-            latch.start();
-            Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS));
-
-            for ( int i = 0; i < 5; ++i )
+            } )
             {
-                server.stop();
-                server.restart();
-                timing.sleepABit();
+                latch.start();
+                
Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS));
+
+                for ( int i = 0; i < 5; ++i )
+                {
+                    server.stop();
+                    server.restart();
+                    timing.sleepABit();
+                }
+                
Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS));
+                Assert.assertEquals(resetCount.get(), 2);
             }
-            Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS));
-            Assert.assertEquals(resetCount.get(), 2);
         }
     }
 
diff --git a/src/site/confluence/errors.confluence 
b/src/site/confluence/errors.confluence
index b4f6643..97f23fd 100644
--- a/src/site/confluence/errors.confluence
+++ b/src/site/confluence/errors.confluence
@@ -19,7 +19,8 @@ in a retry mechanism. Thus, the following guarantees can be 
made:
 h2. Notifications
 Curator exposes several listenable interfaces for clients to monitor the state 
of the ZooKeeper connection.
 
-{{ConnectionStateListener}} is called when there are connection disruptions. 
Clients can monitor these changes and take
+{{ConnectionStateListener}} (note see [[Utilities|utilities.html]] for details 
on properly decorating listeners) is called when there are connection
+disruptions. Clients can monitor these changes and take
 appropriate action. These are the possible state changes:
 
 |CONNECTED|Sent for the first successful connection to the server. NOTE: You 
will only get one of these messages for any CuratorFramework instance.|
diff --git a/src/site/confluence/utilities.confluence 
b/src/site/confluence/utilities.confluence
index 3a62fa5..1971c3c 100644
--- a/src/site/confluence/utilities.confluence
+++ b/src/site/confluence/utilities.confluence
@@ -14,6 +14,50 @@ Various static methods to help with using ZooKeeper ZNode 
paths:
 * getSortedChildren: Return the children of the given path sorted by sequence 
number
 * makePath: Given a parent path and a child node, create a combined full path
 
+h2. Circuit Breaking ConnectionStateListener
+
+During network outages ZooKeeper can become very noisy sending 
connection/disconnection events in rapid succession. Curator recipes respond to 
these
+messages by resetting state, etc. E.g. LeaderLatch must delete its lock node 
and try to recreate it in order to try to re\-obtain leadership, etc.
+
+This noisy herding can be avoided by using the circuit breaking listener 
decorator. When it receives ConnectionState.SUSPENDED, the circuit becomes 
"open"
+(based on the provided RetryPolicy) and will ignore future connection state 
changes until RetryPolicy timeout has elapsed. Note: however, if the connection
+goes from ConnectionState.SUSPENDED to ConnectionState.LOST the first LOST 
state is sent.
+
+When the circuit decorator is closed, all connection state changes are 
forwarded to the managed listener. When the first disconnected state is 
received, the
+circuit becomes open. The state change that caused the circuit to open is sent 
to the managed listener and the RetryPolicy will be used to get a delay amount.
+While the delay is active, the decorator will store state changes but will not 
forward them to the managed listener (except, however, the first time the state
+changes from SUSPENDED to LOST). When the delay elapses, if the connection has 
been restored, the circuit closes and forwards the new state to the managed 
listener.
+If the connection has not been restored, the RetryPolicy is checked again. If 
the RetryPolicy indicates another retry is allowed the process repeats. If, 
however,
+the RetryPolicy indicates that retries are exhausted then the circuit closes 
\- if the current state is different than the state that caused the circuit to 
open it is
+forwarded to the managed listener.
+
+You can enable the Circuit Breaking ConnectionStateListener during creation of 
your CuratorFramework instance. All Curator recipes will decorate
+their ConnectionStateListeners using the configured decorator. E.g.
+
+{code}
+ConnectionStateListenerDecorator decorator = 
ConnectionStateListenerDecorator.circuitBreaking(...);
+CuratorFramework client = CuratorFrameworkFactory.builder()
+    ...
+    .connectionStateListenerDecorator(decorator)
+    ...
+    .build();
+{code}
+
+If you are setting a ConnectionStateListener you should always "decorate" it 
by calling {{decorateConnectionStateListener()}}.
+
+{code}
+CuratorFramework client ...
+ConnectionStateListener listener = ...
+ConnectionStateListener decoratedListener = 
client.decorateConnectionStateListener(listener);
+
+...
+
+client.getConnectionStateListenable().addListener(decoratedListener);
+
+// later, to remove...
+client.getConnectionStateListenable().removeListener(decoratedListener);
+{code}
+
 h2. Locker
 
 Curator's Locker uses Java 7's try\-with\-resources feature to making using 
Curator locks safer:

Reply via email to