This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch fix-circuit-breaker
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 5fcf58149635c6f4733cdcc78c122bc1bcb51407
Author: randgalt <[email protected]>
AuthorDate: Sun Jul 28 01:01:13 2019 -0500

    wip
---
 .../curator/framework/listen/ListenerManager.java  |  5 ++
 .../framework/listen/StandardListenerManager.java  |  2 +-
 ...tenerManager.java => UnaryListenerManager.java} | 26 +-----
 .../curator/framework/state/CircuitBreaker.java    | 20 ++++-
 .../CircuitBreakingConnectionStateListener.java    | 35 ++++++--
 .../framework/state/CircuitBreakingManager.java    | 91 ++++++++++++++++++++
 .../state/ConnectionStateListenerDecorator.java    | 57 +++++++++++--
 .../framework/state/ConnectionStateManager.java    |  8 +-
 .../framework/state/TestCircuitBreaker.java        |  6 +-
 .../framework/recipes/leader/LeaderLatch.java      |  3 +-
 .../framework/recipes/leader/TestLeaderLatch.java  | 97 +++++++++++++++++-----
 src/site/confluence/utilities.confluence           | 16 ++--
 12 files changed, 291 insertions(+), 75 deletions(-)

diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java
index cab0426..85bc8f9 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java
@@ -41,4 +41,9 @@ public interface ListenerManager<K, V> extends Listenable<K>
      * @param function function to call for each listener
      */
     void forEach(Consumer<V> function);
+
+    default boolean isEmpty()
+    {
+        return size() == 0;
+    }
 }
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java
index e07fe47..8213967 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java
@@ -26,7 +26,7 @@ import java.util.function.UnaryOperator;
 /**
  * Non mapping version of a listener container
  */
-public class StandardListenerManager<T> implements ListenerManager<T, T>
+public class StandardListenerManager<T> implements UnaryListenerManager<T>
 {
     private final ListenerManager<T, T> container;
 
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/listen/UnaryListenerManager.java
similarity index 63%
copy from 
curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java
copy to 
curator-framework/src/main/java/org/apache/curator/framework/listen/UnaryListenerManager.java
index cab0426..54497f4 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/listen/UnaryListenerManager.java
@@ -18,27 +18,9 @@
  */
 package org.apache.curator.framework.listen;
 
-import java.util.function.Consumer;
-
-public interface ListenerManager<K, V> extends Listenable<K>
+/**
+ * A {@link ListenerManager} that doesn't do any mapping
+ */
+public interface UnaryListenerManager<T> extends ListenerManager<T, T>
 {
-    /**
-     * Remove all listeners
-     */
-    void clear();
-
-    /**
-     * Return the number of listeners
-     *
-     * @return number
-     */
-    int size();
-
-    /**
-     * Utility - apply the given function to each listener. The function 
receives
-     * the listener as an argument.
-     *
-     * @param function function to call for each listener
-     */
-    void forEach(Consumer<V> function);
 }
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java
index 03e44f8..c207128 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java
@@ -20,12 +20,12 @@ package org.apache.curator.framework.state;
 
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.RetrySleeper;
+import org.apache.curator.utils.ThreadUtils;
 import java.time.Duration;
 import java.util.Objects;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-// must be guarded by sync
 class CircuitBreaker
 {
     private final RetryPolicy retryPolicy;
@@ -35,12 +35,18 @@ class CircuitBreaker
     private int retryCount = 0;
     private long startNanos = 0;
 
-    CircuitBreaker(RetryPolicy retryPolicy, ScheduledExecutorService service)
+    static CircuitBreaker build(RetryPolicy retryPolicy)
     {
-        this.retryPolicy = Objects.requireNonNull(retryPolicy, "retryPolicy 
cannot be null");
-        this.service = Objects.requireNonNull(service, "service cannot be 
null");
+        return new CircuitBreaker(retryPolicy, 
ThreadUtils.newSingleThreadScheduledExecutor("CircuitBreakingConnectionStateListener"));
+    }
+
+    static CircuitBreaker build(RetryPolicy retryPolicy, 
ScheduledExecutorService service)
+    {
+        return new CircuitBreaker(retryPolicy, service);
     }
 
+    // IMPORTANT - all methods below MUST be guarded by synchronization
+
     boolean isOpen()
     {
         return isOpen;
@@ -96,4 +102,10 @@ class CircuitBreaker
         startNanos = 0;
         return wasOpen;
     }
+
+    private CircuitBreaker(RetryPolicy retryPolicy, ScheduledExecutorService 
service)
+    {
+        this.retryPolicy = Objects.requireNonNull(retryPolicy, "retryPolicy 
cannot be null");
+        this.service = Objects.requireNonNull(service, "service cannot be 
null");
+    }
 }
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java
index 24eba01..270ca71 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java
@@ -20,7 +20,6 @@ package org.apache.curator.framework.state;
 
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.utils.ThreadUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.Objects;
@@ -35,7 +34,7 @@ import java.util.concurrent.ScheduledExecutorService;
  * </p>
  *
  * <p>
- *     This noisy herding can be avoided by using the circuit breaking 
listener decorator. When it
+ *     This noisy herding can be avoided by using the circuit breaking 
listener. When it
  *     receives {@link 
org.apache.curator.framework.state.ConnectionState#SUSPENDED}, the circuit
  *     becomes "open" (based on the provided {@link 
org.apache.curator.RetryPolicy}) and will ignore
  *     future connection state changes until RetryPolicy timeout has elapsed. 
Note: however, if the connection
@@ -44,10 +43,10 @@ import java.util.concurrent.ScheduledExecutorService;
  * </p>
  *
  * <p>
- *     When the circuit decorator is closed, all connection state changes are 
forwarded to the managed
+ *     When the circuit is closed, all connection state changes are forwarded 
to the managed
  *     listener. When the first disconnected state is received, the circuit 
becomes open. The state change
  *     that caused the circuit to open is sent to the managed listener and the 
RetryPolicy will be used to
- *     get a delay amount. While the delay is active, the decorator will store 
state changes but will not
+ *     get a delay amount. While the delay is active, the circuit breaker will 
store state changes but will not
  *     forward them to the managed listener (except, however, the first time 
the state changes from SUSPENDED to LOST).
  *     When the delay elapses, if the connection has been restored, the 
circuit closes and forwards the
  *     new state to the managed listener. If the connection has not been 
restored, the RetryPolicy is checked
@@ -55,6 +54,23 @@ import java.util.concurrent.ScheduledExecutorService;
  *     RetryPolicy indicates that retries are exhausted then the circuit 
closes - if the current state
  *     is different than the state that caused the circuit to open it is 
forwarded to the managed listener.
  * </p>
+ *
+ * <p>
+ *     NOTE: You should not use this listener directly. Instead, set {@link 
org.apache.curator.framework.state.ConnectionStateListenerDecorator}
+ *     in the {@link 
org.apache.curator.framework.CuratorFrameworkFactory.Builder#connectionStateListenerDecorator(ConnectionStateListenerDecorator)}.
+ * </p>
+ *
+ * <p>
+ *     E.g.
+ * <code><pre>
+ * ConnectionStateListenerDecorator decorator = 
ConnectionStateListenerDecorator.circuitBreaking(...retry policy for circuit 
breaking...);
+ * CuratorFramework client = CuratorFrameworkFactory.builder()
+ *     .connectionStateListenerDecorator(decorator)
+ *     ... etc ...
+ *     .build();
+ * // all connection state listeners set for "client" will get circuit 
breaking behavior
+ * </pre></code>
+ * </p>
  */
 public class CircuitBreakingConnectionStateListener implements 
ConnectionStateListener
 {
@@ -77,7 +93,7 @@ public class CircuitBreakingConnectionStateListener 
implements ConnectionStateLi
      */
     public CircuitBreakingConnectionStateListener(CuratorFramework client, 
ConnectionStateListener listener, RetryPolicy retryPolicy)
     {
-        this(client, listener, retryPolicy, 
ThreadUtils.newSingleThreadScheduledExecutor("CircuitBreakingConnectionStateListener"));
+        this(client, listener, CircuitBreaker.build(retryPolicy));
     }
 
     /**
@@ -88,9 +104,14 @@ public class CircuitBreakingConnectionStateListener 
implements ConnectionStateLi
      */
     public CircuitBreakingConnectionStateListener(CuratorFramework client, 
ConnectionStateListener listener, RetryPolicy retryPolicy, 
ScheduledExecutorService service)
     {
-        this.client = client;
+        this(client, listener, CircuitBreaker.build(retryPolicy, service));
+    }
+
+    CircuitBreakingConnectionStateListener(CuratorFramework client, 
ConnectionStateListener listener, CircuitBreaker circuitBreaker)
+    {
+        this.client = Objects.requireNonNull(client, "client cannot be null");
         this.listener = Objects.requireNonNull(listener, "listener cannot be 
null");
-        circuitBreaker = new CircuitBreaker(retryPolicy, service);
+        this.circuitBreaker = Objects.requireNonNull(circuitBreaker, 
"circuitBreaker cannot be null");
         reset();
     }
 
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingManager.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingManager.java
new file mode 100644
index 0000000..a29c641
--- /dev/null
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingManager.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.state;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.StandardListenerManager;
+import org.apache.curator.framework.listen.UnaryListenerManager;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+class CircuitBreakingManager implements 
UnaryListenerManager<ConnectionStateListener>
+{
+    private final StandardListenerManager<ConnectionStateListener> decorated = 
StandardListenerManager.standard();
+    private final StandardListenerManager<ConnectionStateListener> undecorated 
= StandardListenerManager.standard();
+    private final CircuitBreakingConnectionStateListener masterListener;
+
+    CircuitBreakingManager(CuratorFramework client, CircuitBreaker 
circuitBreaker)
+    {
+        ConnectionStateListener masterStateChanged = (__, newState) -> 
decorated.forEach(listener -> listener.stateChanged(client, newState));
+        masterListener = new CircuitBreakingConnectionStateListener(client, 
masterStateChanged, circuitBreaker);
+    }
+
+    @Override
+    public void clear()
+    {
+        undecorated.clear();
+        decorated.clear();
+    }
+
+    @Override
+    public int size()
+    {
+        return decorated.size() + undecorated.size();
+    }
+
+    @Override
+    public void forEach(Consumer<ConnectionStateListener> function)
+    {
+        undecorated.forEach(function);
+        function.accept(masterListener);
+    }
+
+    @Override
+    public void addListener(ConnectionStateListener listener)
+    {
+        if ( listener.doNotDecorate() )
+        {
+            undecorated.addListener(listener);
+        }
+        else
+        {
+            decorated.addListener(listener);
+        }
+    }
+
+    @Override
+    public void addListener(ConnectionStateListener listener, Executor 
executor)
+    {
+        if ( listener.doNotDecorate() )
+        {
+            undecorated.addListener(listener, executor);
+        }
+        else
+        {
+            decorated.addListener(listener, executor);
+        }
+    }
+
+    @Override
+    public void removeListener(ConnectionStateListener listener)
+    {
+        decorated.addListener(listener);
+        undecorated.addListener(listener);
+    }
+}
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java
index b95c4b3..e22b2c4 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java
@@ -20,6 +20,8 @@ package org.apache.curator.framework.state;
 
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.StandardListenerManager;
+import org.apache.curator.framework.listen.UnaryListenerManager;
 import java.util.concurrent.ScheduledExecutorService;
 
 /**
@@ -49,33 +51,76 @@ import java.util.concurrent.ScheduledExecutorService;
 @FunctionalInterface
 public interface ConnectionStateListenerDecorator
 {
-    ConnectionStateListener decorateListener(CuratorFramework client, 
ConnectionStateListener actual);
+    /**
+     * @deprecated Added in version 4.2.0 this feature has been refined and 
this method is no longer used. It will be removed in a future version.
+     */
+    @Deprecated
+    default ConnectionStateListener decorateListener(CuratorFramework client, 
ConnectionStateListener actual) {
+        throw new UnsupportedOperationException("This method has been 
deprecated");
+    }
+
+    /**
+     * Create listener manager that decorates listeners as needed
+     *
+     * @param client curator client
+     * @return manager
+     */
+    UnaryListenerManager<ConnectionStateListener> manager(CuratorFramework 
client);
 
     /**
      * Pass through - does no decoration
      */
-    ConnectionStateListenerDecorator standard = (__, actual) -> actual;
+    ConnectionStateListenerDecorator standard = (__) -> 
StandardListenerManager.standard();
+
+    /**
+     * Decorates listeners with circuit breaking behavior using {@link 
org.apache.curator.framework.state.CircuitBreakingConnectionStateListener}
+     * and a shared {@link org.apache.curator.framework.state.CircuitBreaker}.
+     *
+     * @param retryPolicy the circuit breaking policy to use
+     * @return new decorator
+     */
+    static ConnectionStateListenerDecorator 
circuitBreakingDecorator(RetryPolicy retryPolicy)
+    {
+        return client -> new CircuitBreakingManager(client, 
CircuitBreaker.build(retryPolicy));
+    }
+
+    /**
+     * Decorates listeners with circuit breaking behavior using {@link 
org.apache.curator.framework.state.CircuitBreakingConnectionStateListener}
+     * and a shared {@link org.apache.curator.framework.state.CircuitBreaker}.
+     *
+     * @param retryPolicy the circuit breaking policy to use
+     * @return new decorator
+     */
+    static ConnectionStateListenerDecorator 
circuitBreakingDecorator(RetryPolicy retryPolicy, ScheduledExecutorService 
service)
+    {
+        return client -> new CircuitBreakingManager(client, 
CircuitBreaker.build(retryPolicy, service));
+    }
 
     /**
-     * Decorates the listener with circuit breaking behavior using {@link 
org.apache.curator.framework.state.CircuitBreakingConnectionStateListener}
+     * Decorates listeners with circuit breaking behavior using {@link 
org.apache.curator.framework.state.CircuitBreakingConnectionStateListener}
      *
      * @param retryPolicy the circuit breaking policy to use
      * @return new decorator
+     * @deprecated use {@link 
#circuitBreakingDecorator(org.apache.curator.RetryPolicy)} instead. This 
version creates a new CircuitBreaker for each listener.
+     * @see 
org.apache.curator.framework.state.CircuitBreakingConnectionStateListener
      */
+    @Deprecated
     static ConnectionStateListenerDecorator circuitBreaking(RetryPolicy 
retryPolicy)
     {
-        return (client, actual) -> new 
CircuitBreakingConnectionStateListener(client, actual, retryPolicy);
+        return client -> StandardListenerManager.mappingStandard(listener -> 
listener.doNotDecorate() ? listener : new 
CircuitBreakingConnectionStateListener(client, listener, retryPolicy));
     }
 
     /**
-     * Decorates the listener with circuit breaking behavior using {@link 
org.apache.curator.framework.state.CircuitBreakingConnectionStateListener}
+     * Decorates listeners with circuit breaking behavior using {@link 
org.apache.curator.framework.state.CircuitBreakingConnectionStateListener}
      *
      * @param retryPolicy the circuit breaking policy to use
      * @param service the scheduler to use
      * @return new decorator
+     * @deprecated use {@link 
#circuitBreakingDecorator(org.apache.curator.RetryPolicy, 
java.util.concurrent.ScheduledExecutorService)} instead. This version creates a 
new CircuitBreaker for each listener.
      */
+    @Deprecated
     static ConnectionStateListenerDecorator circuitBreaking(RetryPolicy 
retryPolicy, ScheduledExecutorService service)
     {
-        return (client, actual) -> new 
CircuitBreakingConnectionStateListener(client, actual, retryPolicy, service);
+        return client -> StandardListenerManager.mappingStandard(listener -> 
listener.doNotDecorate() ? listener : new 
CircuitBreakingConnectionStateListener(client, listener, retryPolicy, service));
     }
 }
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 55e17c8..a20eab3 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -22,7 +22,7 @@ package org.apache.curator.framework.state;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.listen.StandardListenerManager;
+import org.apache.curator.framework.listen.UnaryListenerManager;
 import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.ThreadUtils;
 import org.slf4j.Logger;
@@ -71,7 +71,7 @@ public class ConnectionStateManager implements Closeable
     private final AtomicBoolean initialConnectMessageSent = new 
AtomicBoolean(false);
     private final ExecutorService service;
     private final AtomicReference<State> state = new 
AtomicReference<State>(State.LATENT);
-    private final StandardListenerManager<ConnectionStateListener> listeners;
+    private final UnaryListenerManager<ConnectionStateListener> listeners;
 
     // guarded by sync
     private ConnectionState currentConnectionState;
@@ -113,7 +113,7 @@ public class ConnectionStateManager implements Closeable
             threadFactory = 
ThreadUtils.newThreadFactory("ConnectionStateManager");
         }
         service = Executors.newSingleThreadExecutor(threadFactory);
-        listeners = StandardListenerManager.mappingStandard(listener -> 
listener.doNotDecorate() ? listener : 
connectionStateListenerDecorator.decorateListener(client, listener));
+        listeners = connectionStateListenerDecorator.manager(client);
     }
 
     /**
@@ -272,7 +272,7 @@ public class ConnectionStateManager implements Closeable
                 final ConnectionState newState = eventQueue.poll(pollMaxMs, 
TimeUnit.MILLISECONDS);
                 if ( newState != null )
                 {
-                    if ( listeners.size() == 0 )
+                    if ( listeners.isEmpty() )
                     {
                         log.warn("There are no ConnectionStateListeners 
registered.");
                     }
diff --git 
a/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java
 
b/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java
index f4096cb..bee917e 100644
--- 
a/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java
+++ 
b/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java
@@ -58,7 +58,7 @@ public class TestCircuitBreaker
         final int retryQty = 1;
         final Duration delay = Duration.ofSeconds(10);
 
-        CircuitBreaker circuitBreaker = new CircuitBreaker(new 
RetryNTimes(retryQty, (int)delay.toMillis()), service);
+        CircuitBreaker circuitBreaker = CircuitBreaker.build(new 
RetryNTimes(retryQty, (int)delay.toMillis()), service);
         AtomicInteger counter = new AtomicInteger(0);
 
         Assert.assertTrue(circuitBreaker.tryToOpen(counter::incrementAndGet));
@@ -79,7 +79,7 @@ public class TestCircuitBreaker
     @Test
     public void testVariousOpenRetryFails()
     {
-        CircuitBreaker circuitBreaker = new CircuitBreaker(new 
RetryForever(1), service);
+        CircuitBreaker circuitBreaker = CircuitBreaker.build(new 
RetryForever(1), service);
         Assert.assertFalse(circuitBreaker.tryToRetry(() -> {}));
         Assert.assertTrue(circuitBreaker.tryToOpen(() -> {}));
         Assert.assertFalse(circuitBreaker.tryToOpen(() -> {}));
@@ -91,7 +91,7 @@ public class TestCircuitBreaker
     public void testWithRetryUntilElapsed()
     {
         RetryPolicy retryPolicy = new RetryUntilElapsed(10000, 10000);
-        CircuitBreaker circuitBreaker = new CircuitBreaker(retryPolicy, 
service);
+        CircuitBreaker circuitBreaker = CircuitBreaker.build(retryPolicy, 
service);
         Assert.assertTrue(circuitBreaker.tryToOpen(() -> {}));
         Assert.assertEquals(lastDelay[0], Duration.ofMillis(10000));
     }
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 7107efa..a0b2187 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -622,7 +622,8 @@ public class LeaderLatch implements Closeable
         
client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, 
null));
     }
 
-    private void handleStateChange(ConnectionState newState)
+    @VisibleForTesting
+    protected void handleStateChange(ConnectionState newState)
     {
         switch ( newState )
         {
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index 9717302..4aa298f 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -43,6 +43,7 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
@@ -50,53 +51,111 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 public class TestLeaderLatch extends BaseClassForTests
 {
     private static final String PATH_NAME = "/one/two/me";
     private static final int MAX_LOOPS = 5;
 
+    private static class Holder
+    {
+        final AtomicInteger stateChangeCount = new AtomicInteger(0);
+        final CountDownLatch isLockedLatch = new CountDownLatch(1);
+        volatile LeaderLatch latch;
+    }
+
     @Test
     public void testWithCircuitBreaker() throws Exception
     {
+        final int threadQty = 5;
+
+        ExecutorService executorService = 
Executors.newFixedThreadPool(threadQty);
+        List<Holder> holders = Collections.emptyList();
         Timing2 timing = new Timing2();
-        ConnectionStateListenerDecorator decorator = 
ConnectionStateListenerDecorator.circuitBreaking(new 
RetryForever(timing.multiple(2).milliseconds()));
-        try ( CuratorFramework client = CuratorFrameworkFactory.builder()
+        ConnectionStateListenerDecorator decorator = 
ConnectionStateListenerDecorator.circuitBreakingDecorator(new 
RetryForever(timing.multiple(2).milliseconds()));
+        CuratorFramework client = CuratorFrameworkFactory.builder()
             .connectString(server.getConnectString())
             .retryPolicy(new RetryOneTime(1))
             .connectionStateListenerDecorator(decorator)
             .connectionTimeoutMs(timing.connection())
             .sessionTimeoutMs(timing.session())
-            .build() )
-        {
+            .build();
+        try {
             client.start();
-            AtomicInteger resetCount = new AtomicInteger(0);
-            try ( LeaderLatch latch = new LeaderLatch(client, "/foo/bar")
+            client.create().forPath("/hey");
+
+            Semaphore lostSemaphore = new Semaphore(0);
+            ConnectionStateListener undecoratedListener = new 
ConnectionStateListener()
             {
                 @Override
-                void reset() throws Exception
+                public void stateChanged(CuratorFramework client, 
ConnectionState newState)
                 {
-                    resetCount.incrementAndGet();
-                    super.reset();
+                    if ( newState == ConnectionState.LOST )
+                    {
+                        lostSemaphore.release();
+                    }
                 }
-            } )
-            {
-                latch.start();
-                
Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS));
 
-                for ( int i = 0; i < 5; ++i )
+                @Override
+                public boolean doNotDecorate()
                 {
-                    server.stop();
-                    server.restart();
-                    timing.sleepABit();
+                    return true;
                 }
-                
Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS));
-                Assert.assertEquals(resetCount.get(), 2);
+            };
+            
client.getConnectionStateListenable().addListener(undecoratedListener);
+
+            holders = IntStream.range(0, threadQty)
+                .mapToObj(index -> {
+                    Holder holder = new Holder();
+                    holder.latch = new LeaderLatch(client, "/foo/bar/" + index)
+                    {
+                        @Override
+                        protected void handleStateChange(ConnectionState 
newState)
+                        {
+                            holder.stateChangeCount.incrementAndGet();
+                            super.handleStateChange(newState);
+                        }
+                    };
+                    return holder;
+                })
+                .collect(Collectors.toList());
+
+            holders.forEach(holder -> {
+                executorService.submit(() -> {
+                    holder.latch.start();
+                    
Assert.assertTrue(holder.latch.await(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS));
+                    holder.isLockedLatch.countDown();
+                    return null;
+                });
+                timing.awaitLatch(holder.isLockedLatch);
+            });
+
+            for ( int i = 0; i < 5; ++i )
+            {
+                server.stop();
+                Assert.assertTrue(timing.acquireSemaphore(lostSemaphore));
+                server.restart();
+                timing.sleepABit();
             }
+
+            for ( Holder holder : holders )
+            {
+                
Assert.assertTrue(holder.latch.await(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS));
+                Assert.assertEquals(holder.stateChangeCount.get(), 3);  // 1 
suspend, 1 lost, 1 reconnected
+            }
+        }
+        finally
+        {
+            holders.forEach(holder -> 
CloseableUtils.closeQuietly(holder.latch));
+            CloseableUtils.closeQuietly(client);
+            executorService.shutdownNow();
         }
     }
 
diff --git a/src/site/confluence/utilities.confluence 
b/src/site/confluence/utilities.confluence
index 720d8d9..a6f57d5 100644
--- a/src/site/confluence/utilities.confluence
+++ b/src/site/confluence/utilities.confluence
@@ -19,13 +19,13 @@ h2. Circuit Breaking ConnectionStateListener
 During network outages ZooKeeper can become very noisy sending 
connection/disconnection events in rapid succession. Curator recipes respond to 
these
 messages by resetting state, etc. E.g. LeaderLatch must delete its lock node 
and try to recreate it in order to try to re\-obtain leadership, etc.
 
-This noisy herding can be avoided by using the circuit breaking listener 
decorator. When it receives ConnectionState.SUSPENDED, the circuit becomes 
"open"
+This noisy herding can be avoided by using the circuit breaking listener. When 
it receives ConnectionState.SUSPENDED, the circuit becomes "open"
 (based on the provided RetryPolicy) and will ignore future connection state 
changes until RetryPolicy timeout has elapsed. Note: however, if the connection
 goes from ConnectionState.SUSPENDED to ConnectionState.LOST the first LOST 
state is sent.
 
-When the circuit decorator is closed, all connection state changes are 
forwarded to the managed listener. When the first disconnected state is 
received, the
+When the circuit is closed, all connection state changes are forwarded to the 
managed listener. When the first disconnected state is received, the
 circuit becomes open. The state change that caused the circuit to open is sent 
to the managed listener and the RetryPolicy will be used to get a delay amount.
-While the delay is active, the decorator will store state changes but will not 
forward them to the managed listener (except, however, the first time the state
+While the delay is active, the circuit breaker will store state changes but 
will not forward them to the managed listener (except, however, the first time 
the state
 changes from SUSPENDED to LOST). When the delay elapses, if the connection has 
been restored, the circuit closes and forwards the new state to the managed 
listener.
 If the connection has not been restored, the RetryPolicy is checked again. If 
the RetryPolicy indicates another retry is allowed the process repeats. If, 
however,
 the RetryPolicy indicates that retries are exhausted then the circuit closes 
\- if the current state is different than the state that caused the circuit to 
open it is
@@ -34,12 +34,12 @@ forwarded to the managed listener.
 You can enable the Circuit Breaking ConnectionStateListener during creation of 
your CuratorFramework instance. E.g.
 
 {code}
-ConnectionStateListenerDecorator decorator = 
ConnectionStateListenerDecorator.circuitBreaking(...);
+ConnectionStateListenerDecorator decorator = 
ConnectionStateListenerDecorator.circuitBreaking(...retry policy for circuit 
breaking...);
 CuratorFramework client = CuratorFrameworkFactory.builder()
-    ...
-    .connectionStateListenerDecorator(decorator)
-    ...
-    .build();
+   .connectionStateListenerDecorator(decorator)
+   ... etc ...
+   .build();
+// all connection state listeners set for "client" will get circuit breaking 
behavior
 {code}
 
 h2. Locker

Reply via email to