This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch fix-circuit-breaker in repository https://gitbox.apache.org/repos/asf/curator.git
commit 5fcf58149635c6f4733cdcc78c122bc1bcb51407 Author: randgalt <[email protected]> AuthorDate: Sun Jul 28 01:01:13 2019 -0500 wip --- .../curator/framework/listen/ListenerManager.java | 5 ++ .../framework/listen/StandardListenerManager.java | 2 +- ...tenerManager.java => UnaryListenerManager.java} | 26 +----- .../curator/framework/state/CircuitBreaker.java | 20 ++++- .../CircuitBreakingConnectionStateListener.java | 35 ++++++-- .../framework/state/CircuitBreakingManager.java | 91 ++++++++++++++++++++ .../state/ConnectionStateListenerDecorator.java | 57 +++++++++++-- .../framework/state/ConnectionStateManager.java | 8 +- .../framework/state/TestCircuitBreaker.java | 6 +- .../framework/recipes/leader/LeaderLatch.java | 3 +- .../framework/recipes/leader/TestLeaderLatch.java | 97 +++++++++++++++++----- src/site/confluence/utilities.confluence | 16 ++-- 12 files changed, 291 insertions(+), 75 deletions(-) diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java index cab0426..85bc8f9 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java @@ -41,4 +41,9 @@ public interface ListenerManager<K, V> extends Listenable<K> * @param function function to call for each listener */ void forEach(Consumer<V> function); + + default boolean isEmpty() + { + return size() == 0; + } } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java index e07fe47..8213967 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java @@ -26,7 +26,7 @@ import java.util.function.UnaryOperator; /** * Non mapping version of a listener container */ -public class StandardListenerManager<T> implements ListenerManager<T, T> +public class StandardListenerManager<T> implements UnaryListenerManager<T> { private final ListenerManager<T, T> container; diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/UnaryListenerManager.java similarity index 63% copy from curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java copy to curator-framework/src/main/java/org/apache/curator/framework/listen/UnaryListenerManager.java index cab0426..54497f4 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/listen/UnaryListenerManager.java @@ -18,27 +18,9 @@ */ package org.apache.curator.framework.listen; -import java.util.function.Consumer; - -public interface ListenerManager<K, V> extends Listenable<K> +/** + * A {@link ListenerManager} that doesn't do any mapping + */ +public interface UnaryListenerManager<T> extends ListenerManager<T, T> { - /** - * Remove all listeners - */ - void clear(); - - /** - * Return the number of listeners - * - * @return number - */ - int size(); - - /** - * Utility - apply the given function to each listener. The function receives - * the listener as an argument. - * - * @param function function to call for each listener - */ - void forEach(Consumer<V> function); } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java index 03e44f8..c207128 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java @@ -20,12 +20,12 @@ package org.apache.curator.framework.state; import org.apache.curator.RetryPolicy; import org.apache.curator.RetrySleeper; +import org.apache.curator.utils.ThreadUtils; import java.time.Duration; import java.util.Objects; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -// must be guarded by sync class CircuitBreaker { private final RetryPolicy retryPolicy; @@ -35,12 +35,18 @@ class CircuitBreaker private int retryCount = 0; private long startNanos = 0; - CircuitBreaker(RetryPolicy retryPolicy, ScheduledExecutorService service) + static CircuitBreaker build(RetryPolicy retryPolicy) { - this.retryPolicy = Objects.requireNonNull(retryPolicy, "retryPolicy cannot be null"); - this.service = Objects.requireNonNull(service, "service cannot be null"); + return new CircuitBreaker(retryPolicy, ThreadUtils.newSingleThreadScheduledExecutor("CircuitBreakingConnectionStateListener")); + } + + static CircuitBreaker build(RetryPolicy retryPolicy, ScheduledExecutorService service) + { + return new CircuitBreaker(retryPolicy, service); } + // IMPORTANT - all methods below MUST be guarded by synchronization + boolean isOpen() { return isOpen; @@ -96,4 +102,10 @@ class CircuitBreaker startNanos = 0; return wasOpen; } + + private CircuitBreaker(RetryPolicy retryPolicy, ScheduledExecutorService service) + { + this.retryPolicy = Objects.requireNonNull(retryPolicy, "retryPolicy cannot be null"); + this.service = Objects.requireNonNull(service, "service cannot be null"); + } } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java index 24eba01..270ca71 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java @@ -20,7 +20,6 @@ package org.apache.curator.framework.state; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.utils.ThreadUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Objects; @@ -35,7 +34,7 @@ import java.util.concurrent.ScheduledExecutorService; * </p> * * <p> - * This noisy herding can be avoided by using the circuit breaking listener decorator. When it + * This noisy herding can be avoided by using the circuit breaking listener. When it * receives {@link org.apache.curator.framework.state.ConnectionState#SUSPENDED}, the circuit * becomes "open" (based on the provided {@link org.apache.curator.RetryPolicy}) and will ignore * future connection state changes until RetryPolicy timeout has elapsed. Note: however, if the connection @@ -44,10 +43,10 @@ import java.util.concurrent.ScheduledExecutorService; * </p> * * <p> - * When the circuit decorator is closed, all connection state changes are forwarded to the managed + * When the circuit is closed, all connection state changes are forwarded to the managed * listener. When the first disconnected state is received, the circuit becomes open. The state change * that caused the circuit to open is sent to the managed listener and the RetryPolicy will be used to - * get a delay amount. While the delay is active, the decorator will store state changes but will not + * get a delay amount. While the delay is active, the circuit breaker will store state changes but will not * forward them to the managed listener (except, however, the first time the state changes from SUSPENDED to LOST). * When the delay elapses, if the connection has been restored, the circuit closes and forwards the * new state to the managed listener. If the connection has not been restored, the RetryPolicy is checked @@ -55,6 +54,23 @@ import java.util.concurrent.ScheduledExecutorService; * RetryPolicy indicates that retries are exhausted then the circuit closes - if the current state * is different than the state that caused the circuit to open it is forwarded to the managed listener. * </p> + * + * <p> + * NOTE: You should not use this listener directly. Instead, set {@link org.apache.curator.framework.state.ConnectionStateListenerDecorator} + * in the {@link org.apache.curator.framework.CuratorFrameworkFactory.Builder#connectionStateListenerDecorator(ConnectionStateListenerDecorator)}. + * </p> + * + * <p> + * E.g. + * <code><pre> + * ConnectionStateListenerDecorator decorator = ConnectionStateListenerDecorator.circuitBreaking(...retry policy for circuit breaking...); + * CuratorFramework client = CuratorFrameworkFactory.builder() + * .connectionStateListenerDecorator(decorator) + * ... etc ... + * .build(); + * // all connection state listeners set for "client" will get circuit breaking behavior + * </pre></code> + * </p> */ public class CircuitBreakingConnectionStateListener implements ConnectionStateListener { @@ -77,7 +93,7 @@ public class CircuitBreakingConnectionStateListener implements ConnectionStateLi */ public CircuitBreakingConnectionStateListener(CuratorFramework client, ConnectionStateListener listener, RetryPolicy retryPolicy) { - this(client, listener, retryPolicy, ThreadUtils.newSingleThreadScheduledExecutor("CircuitBreakingConnectionStateListener")); + this(client, listener, CircuitBreaker.build(retryPolicy)); } /** @@ -88,9 +104,14 @@ public class CircuitBreakingConnectionStateListener implements ConnectionStateLi */ public CircuitBreakingConnectionStateListener(CuratorFramework client, ConnectionStateListener listener, RetryPolicy retryPolicy, ScheduledExecutorService service) { - this.client = client; + this(client, listener, CircuitBreaker.build(retryPolicy, service)); + } + + CircuitBreakingConnectionStateListener(CuratorFramework client, ConnectionStateListener listener, CircuitBreaker circuitBreaker) + { + this.client = Objects.requireNonNull(client, "client cannot be null"); this.listener = Objects.requireNonNull(listener, "listener cannot be null"); - circuitBreaker = new CircuitBreaker(retryPolicy, service); + this.circuitBreaker = Objects.requireNonNull(circuitBreaker, "circuitBreaker cannot be null"); reset(); } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingManager.java new file mode 100644 index 0000000..a29c641 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingManager.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.state; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.listen.StandardListenerManager; +import org.apache.curator.framework.listen.UnaryListenerManager; +import java.util.concurrent.Executor; +import java.util.function.Consumer; + +class CircuitBreakingManager implements UnaryListenerManager<ConnectionStateListener> +{ + private final StandardListenerManager<ConnectionStateListener> decorated = StandardListenerManager.standard(); + private final StandardListenerManager<ConnectionStateListener> undecorated = StandardListenerManager.standard(); + private final CircuitBreakingConnectionStateListener masterListener; + + CircuitBreakingManager(CuratorFramework client, CircuitBreaker circuitBreaker) + { + ConnectionStateListener masterStateChanged = (__, newState) -> decorated.forEach(listener -> listener.stateChanged(client, newState)); + masterListener = new CircuitBreakingConnectionStateListener(client, masterStateChanged, circuitBreaker); + } + + @Override + public void clear() + { + undecorated.clear(); + decorated.clear(); + } + + @Override + public int size() + { + return decorated.size() + undecorated.size(); + } + + @Override + public void forEach(Consumer<ConnectionStateListener> function) + { + undecorated.forEach(function); + function.accept(masterListener); + } + + @Override + public void addListener(ConnectionStateListener listener) + { + if ( listener.doNotDecorate() ) + { + undecorated.addListener(listener); + } + else + { + decorated.addListener(listener); + } + } + + @Override + public void addListener(ConnectionStateListener listener, Executor executor) + { + if ( listener.doNotDecorate() ) + { + undecorated.addListener(listener, executor); + } + else + { + decorated.addListener(listener, executor); + } + } + + @Override + public void removeListener(ConnectionStateListener listener) + { + decorated.addListener(listener); + undecorated.addListener(listener); + } +} diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java index b95c4b3..e22b2c4 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java @@ -20,6 +20,8 @@ package org.apache.curator.framework.state; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.listen.StandardListenerManager; +import org.apache.curator.framework.listen.UnaryListenerManager; import java.util.concurrent.ScheduledExecutorService; /** @@ -49,33 +51,76 @@ import java.util.concurrent.ScheduledExecutorService; @FunctionalInterface public interface ConnectionStateListenerDecorator { - ConnectionStateListener decorateListener(CuratorFramework client, ConnectionStateListener actual); + /** + * @deprecated Added in version 4.2.0 this feature has been refined and this method is no longer used. It will be removed in a future version. + */ + @Deprecated + default ConnectionStateListener decorateListener(CuratorFramework client, ConnectionStateListener actual) { + throw new UnsupportedOperationException("This method has been deprecated"); + } + + /** + * Create listener manager that decorates listeners as needed + * + * @param client curator client + * @return manager + */ + UnaryListenerManager<ConnectionStateListener> manager(CuratorFramework client); /** * Pass through - does no decoration */ - ConnectionStateListenerDecorator standard = (__, actual) -> actual; + ConnectionStateListenerDecorator standard = (__) -> StandardListenerManager.standard(); + + /** + * Decorates listeners with circuit breaking behavior using {@link org.apache.curator.framework.state.CircuitBreakingConnectionStateListener} + * and a shared {@link org.apache.curator.framework.state.CircuitBreaker}. + * + * @param retryPolicy the circuit breaking policy to use + * @return new decorator + */ + static ConnectionStateListenerDecorator circuitBreakingDecorator(RetryPolicy retryPolicy) + { + return client -> new CircuitBreakingManager(client, CircuitBreaker.build(retryPolicy)); + } + + /** + * Decorates listeners with circuit breaking behavior using {@link org.apache.curator.framework.state.CircuitBreakingConnectionStateListener} + * and a shared {@link org.apache.curator.framework.state.CircuitBreaker}. + * + * @param retryPolicy the circuit breaking policy to use + * @return new decorator + */ + static ConnectionStateListenerDecorator circuitBreakingDecorator(RetryPolicy retryPolicy, ScheduledExecutorService service) + { + return client -> new CircuitBreakingManager(client, CircuitBreaker.build(retryPolicy, service)); + } /** - * Decorates the listener with circuit breaking behavior using {@link org.apache.curator.framework.state.CircuitBreakingConnectionStateListener} + * Decorates listeners with circuit breaking behavior using {@link org.apache.curator.framework.state.CircuitBreakingConnectionStateListener} * * @param retryPolicy the circuit breaking policy to use * @return new decorator + * @deprecated use {@link #circuitBreakingDecorator(org.apache.curator.RetryPolicy)} instead. This version creates a new CircuitBreaker for each listener. + * @see org.apache.curator.framework.state.CircuitBreakingConnectionStateListener */ + @Deprecated static ConnectionStateListenerDecorator circuitBreaking(RetryPolicy retryPolicy) { - return (client, actual) -> new CircuitBreakingConnectionStateListener(client, actual, retryPolicy); + return client -> StandardListenerManager.mappingStandard(listener -> listener.doNotDecorate() ? listener : new CircuitBreakingConnectionStateListener(client, listener, retryPolicy)); } /** - * Decorates the listener with circuit breaking behavior using {@link org.apache.curator.framework.state.CircuitBreakingConnectionStateListener} + * Decorates listeners with circuit breaking behavior using {@link org.apache.curator.framework.state.CircuitBreakingConnectionStateListener} * * @param retryPolicy the circuit breaking policy to use * @param service the scheduler to use * @return new decorator + * @deprecated use {@link #circuitBreakingDecorator(org.apache.curator.RetryPolicy, java.util.concurrent.ScheduledExecutorService)} instead. This version creates a new CircuitBreaker for each listener. */ + @Deprecated static ConnectionStateListenerDecorator circuitBreaking(RetryPolicy retryPolicy, ScheduledExecutorService service) { - return (client, actual) -> new CircuitBreakingConnectionStateListener(client, actual, retryPolicy, service); + return client -> StandardListenerManager.mappingStandard(listener -> listener.doNotDecorate() ? listener : new CircuitBreakingConnectionStateListener(client, listener, retryPolicy, service)); } } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java index 55e17c8..a20eab3 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java @@ -22,7 +22,7 @@ package org.apache.curator.framework.state; import com.google.common.base.Preconditions; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.listen.Listenable; -import org.apache.curator.framework.listen.StandardListenerManager; +import org.apache.curator.framework.listen.UnaryListenerManager; import org.apache.curator.utils.Compatibility; import org.apache.curator.utils.ThreadUtils; import org.slf4j.Logger; @@ -71,7 +71,7 @@ public class ConnectionStateManager implements Closeable private final AtomicBoolean initialConnectMessageSent = new AtomicBoolean(false); private final ExecutorService service; private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT); - private final StandardListenerManager<ConnectionStateListener> listeners; + private final UnaryListenerManager<ConnectionStateListener> listeners; // guarded by sync private ConnectionState currentConnectionState; @@ -113,7 +113,7 @@ public class ConnectionStateManager implements Closeable threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager"); } service = Executors.newSingleThreadExecutor(threadFactory); - listeners = StandardListenerManager.mappingStandard(listener -> listener.doNotDecorate() ? listener : connectionStateListenerDecorator.decorateListener(client, listener)); + listeners = connectionStateListenerDecorator.manager(client); } /** @@ -272,7 +272,7 @@ public class ConnectionStateManager implements Closeable final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS); if ( newState != null ) { - if ( listeners.size() == 0 ) + if ( listeners.isEmpty() ) { log.warn("There are no ConnectionStateListeners registered."); } diff --git a/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java b/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java index f4096cb..bee917e 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java @@ -58,7 +58,7 @@ public class TestCircuitBreaker final int retryQty = 1; final Duration delay = Duration.ofSeconds(10); - CircuitBreaker circuitBreaker = new CircuitBreaker(new RetryNTimes(retryQty, (int)delay.toMillis()), service); + CircuitBreaker circuitBreaker = CircuitBreaker.build(new RetryNTimes(retryQty, (int)delay.toMillis()), service); AtomicInteger counter = new AtomicInteger(0); Assert.assertTrue(circuitBreaker.tryToOpen(counter::incrementAndGet)); @@ -79,7 +79,7 @@ public class TestCircuitBreaker @Test public void testVariousOpenRetryFails() { - CircuitBreaker circuitBreaker = new CircuitBreaker(new RetryForever(1), service); + CircuitBreaker circuitBreaker = CircuitBreaker.build(new RetryForever(1), service); Assert.assertFalse(circuitBreaker.tryToRetry(() -> {})); Assert.assertTrue(circuitBreaker.tryToOpen(() -> {})); Assert.assertFalse(circuitBreaker.tryToOpen(() -> {})); @@ -91,7 +91,7 @@ public class TestCircuitBreaker public void testWithRetryUntilElapsed() { RetryPolicy retryPolicy = new RetryUntilElapsed(10000, 10000); - CircuitBreaker circuitBreaker = new CircuitBreaker(retryPolicy, service); + CircuitBreaker circuitBreaker = CircuitBreaker.build(retryPolicy, service); Assert.assertTrue(circuitBreaker.tryToOpen(() -> {})); Assert.assertEquals(lastDelay[0], Duration.ofMillis(10000)); } diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index 7107efa..a0b2187 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -622,7 +622,8 @@ public class LeaderLatch implements Closeable client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null)); } - private void handleStateChange(ConnectionState newState) + @VisibleForTesting + protected void handleStateChange(ConnectionState newState) { switch ( newState ) { diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index 9717302..4aa298f 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -43,6 +43,7 @@ import org.testng.Assert; import org.testng.annotations.Test; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; @@ -50,53 +51,111 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.IntStream; public class TestLeaderLatch extends BaseClassForTests { private static final String PATH_NAME = "/one/two/me"; private static final int MAX_LOOPS = 5; + private static class Holder + { + final AtomicInteger stateChangeCount = new AtomicInteger(0); + final CountDownLatch isLockedLatch = new CountDownLatch(1); + volatile LeaderLatch latch; + } + @Test public void testWithCircuitBreaker() throws Exception { + final int threadQty = 5; + + ExecutorService executorService = Executors.newFixedThreadPool(threadQty); + List<Holder> holders = Collections.emptyList(); Timing2 timing = new Timing2(); - ConnectionStateListenerDecorator decorator = ConnectionStateListenerDecorator.circuitBreaking(new RetryForever(timing.multiple(2).milliseconds())); - try ( CuratorFramework client = CuratorFrameworkFactory.builder() + ConnectionStateListenerDecorator decorator = ConnectionStateListenerDecorator.circuitBreakingDecorator(new RetryForever(timing.multiple(2).milliseconds())); + CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(server.getConnectString()) .retryPolicy(new RetryOneTime(1)) .connectionStateListenerDecorator(decorator) .connectionTimeoutMs(timing.connection()) .sessionTimeoutMs(timing.session()) - .build() ) - { + .build(); + try { client.start(); - AtomicInteger resetCount = new AtomicInteger(0); - try ( LeaderLatch latch = new LeaderLatch(client, "/foo/bar") + client.create().forPath("/hey"); + + Semaphore lostSemaphore = new Semaphore(0); + ConnectionStateListener undecoratedListener = new ConnectionStateListener() { @Override - void reset() throws Exception + public void stateChanged(CuratorFramework client, ConnectionState newState) { - resetCount.incrementAndGet(); - super.reset(); + if ( newState == ConnectionState.LOST ) + { + lostSemaphore.release(); + } } - } ) - { - latch.start(); - Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); - for ( int i = 0; i < 5; ++i ) + @Override + public boolean doNotDecorate() { - server.stop(); - server.restart(); - timing.sleepABit(); + return true; } - Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); - Assert.assertEquals(resetCount.get(), 2); + }; + client.getConnectionStateListenable().addListener(undecoratedListener); + + holders = IntStream.range(0, threadQty) + .mapToObj(index -> { + Holder holder = new Holder(); + holder.latch = new LeaderLatch(client, "/foo/bar/" + index) + { + @Override + protected void handleStateChange(ConnectionState newState) + { + holder.stateChangeCount.incrementAndGet(); + super.handleStateChange(newState); + } + }; + return holder; + }) + .collect(Collectors.toList()); + + holders.forEach(holder -> { + executorService.submit(() -> { + holder.latch.start(); + Assert.assertTrue(holder.latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); + holder.isLockedLatch.countDown(); + return null; + }); + timing.awaitLatch(holder.isLockedLatch); + }); + + for ( int i = 0; i < 5; ++i ) + { + server.stop(); + Assert.assertTrue(timing.acquireSemaphore(lostSemaphore)); + server.restart(); + timing.sleepABit(); } + + for ( Holder holder : holders ) + { + Assert.assertTrue(holder.latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); + Assert.assertEquals(holder.stateChangeCount.get(), 3); // 1 suspend, 1 lost, 1 reconnected + } + } + finally + { + holders.forEach(holder -> CloseableUtils.closeQuietly(holder.latch)); + CloseableUtils.closeQuietly(client); + executorService.shutdownNow(); } } diff --git a/src/site/confluence/utilities.confluence b/src/site/confluence/utilities.confluence index 720d8d9..a6f57d5 100644 --- a/src/site/confluence/utilities.confluence +++ b/src/site/confluence/utilities.confluence @@ -19,13 +19,13 @@ h2. Circuit Breaking ConnectionStateListener During network outages ZooKeeper can become very noisy sending connection/disconnection events in rapid succession. Curator recipes respond to these messages by resetting state, etc. E.g. LeaderLatch must delete its lock node and try to recreate it in order to try to re\-obtain leadership, etc. -This noisy herding can be avoided by using the circuit breaking listener decorator. When it receives ConnectionState.SUSPENDED, the circuit becomes "open" +This noisy herding can be avoided by using the circuit breaking listener. When it receives ConnectionState.SUSPENDED, the circuit becomes "open" (based on the provided RetryPolicy) and will ignore future connection state changes until RetryPolicy timeout has elapsed. Note: however, if the connection goes from ConnectionState.SUSPENDED to ConnectionState.LOST the first LOST state is sent. -When the circuit decorator is closed, all connection state changes are forwarded to the managed listener. When the first disconnected state is received, the +When the circuit is closed, all connection state changes are forwarded to the managed listener. When the first disconnected state is received, the circuit becomes open. The state change that caused the circuit to open is sent to the managed listener and the RetryPolicy will be used to get a delay amount. -While the delay is active, the decorator will store state changes but will not forward them to the managed listener (except, however, the first time the state +While the delay is active, the circuit breaker will store state changes but will not forward them to the managed listener (except, however, the first time the state changes from SUSPENDED to LOST). When the delay elapses, if the connection has been restored, the circuit closes and forwards the new state to the managed listener. If the connection has not been restored, the RetryPolicy is checked again. If the RetryPolicy indicates another retry is allowed the process repeats. If, however, the RetryPolicy indicates that retries are exhausted then the circuit closes \- if the current state is different than the state that caused the circuit to open it is @@ -34,12 +34,12 @@ forwarded to the managed listener. You can enable the Circuit Breaking ConnectionStateListener during creation of your CuratorFramework instance. E.g. {code} -ConnectionStateListenerDecorator decorator = ConnectionStateListenerDecorator.circuitBreaking(...); +ConnectionStateListenerDecorator decorator = ConnectionStateListenerDecorator.circuitBreaking(...retry policy for circuit breaking...); CuratorFramework client = CuratorFrameworkFactory.builder() - ... - .connectionStateListenerDecorator(decorator) - ... - .build(); + .connectionStateListenerDecorator(decorator) + ... etc ... + .build(); +// all connection state listeners set for "client" will get circuit breaking behavior {code} h2. Locker
