This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch CURATOR-505 in repository https://gitbox.apache.org/repos/asf/curator.git
commit 96f186abc3e4977eaac580054bd8a3cdfa79d22a Author: randgalt <[email protected]> AuthorDate: Thu Feb 7 12:10:52 2019 -0500 CURATOR-505 - interim checking - refactoring, simplifications, more testing, and documentation --- .../apache/curator/framework/CuratorFramework.java | 10 +++ .../curator/framework/CuratorFrameworkFactory.java | 2 +- .../framework/imps/CuratorFrameworkImpl.java | 13 +++- .../curator/framework/state/CircuitBreaker.java | 24 +++--- .../CircuitBreakingConnectionStateListener.java | 11 ++- .../state/ConnectionStateListenerDecorator.java | 6 +- .../framework/state/TestCircuitBreaker.java | 42 ++++++++--- ...TestCircuitBreakingConnectionStateListener.java | 86 ++++++++++++++++++++-- .../framework/recipes/leader/TestLeaderLatch.java | 28 +++---- src/site/confluence/errors.confluence | 3 +- src/site/confluence/utilities.confluence | 44 +++++++++++ 11 files changed, 214 insertions(+), 55 deletions(-) diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java index 6513716..2657781 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java @@ -30,6 +30,7 @@ import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.schema.SchemaSet; import org.apache.curator.framework.state.ConnectionStateErrorPolicy; import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.framework.state.ConnectionStateListenerDecorator; import org.apache.curator.utils.EnsurePath; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; @@ -367,4 +368,13 @@ public interface CuratorFramework extends Closeable * @return decorated listener */ ConnectionStateListener decorateConnectionStateListener(ConnectionStateListener actual); + + /** + * Returns a facade of the current instance that uses the given connection state listener + * decorator instead of the configured one + * + * @param newDecorator decorator to use + * @return facade + */ + CuratorFramework usingConnectionStateListenerDecorator(ConnectionStateListenerDecorator newDecorator); } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java index 0cc6e0d..283a093 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java @@ -508,7 +508,7 @@ public class CuratorFrameworkFactory * @return this * @since 4.2.0 */ - public Builder connectionStateListenerFactory(ConnectionStateListenerDecorator connectionStateListenerDecorator) + public Builder connectionStateListenerDecorator(ConnectionStateListenerDecorator connectionStateListenerDecorator) { this.connectionStateListenerDecorator = Objects.requireNonNull(connectionStateListenerDecorator, "connectionStateListenerFactory cannot be null"); return this; diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index 81cae74..f210021 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -236,6 +236,11 @@ public class CuratorFrameworkImpl implements CuratorFramework protected CuratorFrameworkImpl(CuratorFrameworkImpl parent) { + this(parent, parent.connectionStateListenerDecorator); + } + + private CuratorFrameworkImpl(CuratorFrameworkImpl parent, ConnectionStateListenerDecorator connectionStateListenerDecorator) + { client = parent.client; listeners = parent.listeners; unhandledErrorListeners = parent.unhandledErrorListeners; @@ -260,7 +265,7 @@ public class CuratorFrameworkImpl implements CuratorFramework zk34CompatibilityMode = parent.zk34CompatibilityMode; ensembleTracker = null; runSafeService = parent.runSafeService; - connectionStateListenerDecorator = parent.connectionStateListenerDecorator; + this.connectionStateListenerDecorator = connectionStateListenerDecorator; } @Override @@ -599,6 +604,12 @@ public class CuratorFrameworkImpl implements CuratorFramework return connectionStateListenerDecorator.decorateListener(this, actual); } + @Override + public CuratorFramework usingConnectionStateListenerDecorator(ConnectionStateListenerDecorator newDecorator) + { + return new CuratorFrameworkImpl(this, newDecorator); + } + ACLProvider getAclProvider() { return aclProvider; diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java index ad48a15..504edbc 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java @@ -14,7 +14,7 @@ class CircuitBreaker private boolean isOpen = false; private int retryCount = 0; - private long openStartNanos = 0; + private long startNanos = 0; CircuitBreaker(RetryPolicy retryPolicy, ScheduledExecutorService service) { @@ -41,13 +41,13 @@ class CircuitBreaker isOpen = true; retryCount = 0; - openStartNanos = System.nanoTime(); - if ( !tryToRetry(completion) ) + startNanos = System.nanoTime(); + if ( tryToRetry(completion) ) { - close(); - return false; + return true; } - return true; + close(); + return false; } boolean tryToRetry(Runnable completion) @@ -59,13 +59,13 @@ class CircuitBreaker long[] sleepTimeNanos = new long[]{0L}; RetrySleeper retrySleeper = (time, unit) -> sleepTimeNanos[0] = unit.toNanos(time); - if ( !retryPolicy.allowRetry(retryCount, System.nanoTime() - openStartNanos, retrySleeper) ) + if ( retryPolicy.allowRetry(retryCount, System.nanoTime() - startNanos, retrySleeper) ) { - return false; + ++retryCount; + service.schedule(completion, sleepTimeNanos[0], TimeUnit.NANOSECONDS); + return true; } - ++retryCount; - service.schedule(completion, sleepTimeNanos[0], TimeUnit.NANOSECONDS); - return true; + return false; } boolean close() @@ -73,7 +73,7 @@ class CircuitBreaker boolean wasOpen = isOpen; retryCount = 0; isOpen = false; - openStartNanos = 0; + startNanos = 0; return wasOpen; } } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java index 12efad9..dba651a 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java @@ -13,7 +13,7 @@ import java.util.concurrent.ScheduledExecutorService; * A decorator/proxy for connection state listeners that adds circuit breaking behavior. During network * outages ZooKeeper can become very noisy sending connection/disconnection events in rapid succession. * Curator recipes respond to these messages by resetting state, etc. E.g. LeaderLatch must delete - * its lock node and try to recreated it in order to try to re-obtain leadership, etc. + * its lock node and try to recreate it in order to try to re-obtain leadership, etc. * </p> * * <p> @@ -114,7 +114,7 @@ public class CircuitBreakingConnectionStateListener implements ConnectionStateLi log.debug("Could not open circuit breaker. State: {}", newState); } } - callListener(circuitInitialState); + callListener(newState); } private synchronized void handleOpenStateChange(ConnectionState newState) @@ -126,11 +126,10 @@ public class CircuitBreakingConnectionStateListener implements ConnectionStateLi } else { - circuitLostHasBeenSent = true; - circuitInitialState = ConnectionState.LOST; - circuitLastState = newState; log.debug("Circuit is open. State changed to LOST. Sending to listener."); - callListener(circuitInitialState); + circuitLostHasBeenSent = true; + circuitLastState = circuitInitialState = ConnectionState.LOST; + callListener(ConnectionState.LOST); } } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java index 0f11c46..0ac808b 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java @@ -17,14 +17,14 @@ import java.util.concurrent.ScheduledExecutorService; * <code><pre> * CuratorFramework client ... * ConnectionStateListener listener = ... - * ConnectionStateListener wrappedListener = client.wrapConnectionStateListener(listener); + * ConnectionStateListener decoratedListener = client.decorateConnectionStateListener(listener); * * ... * - * client.getConnectionStateListenable().addListener(wrappedListener); + * client.getConnectionStateListenable().addListener(decoratedListener); * * // later, to remove... - * client.getConnectionStateListenable().removeListener(wrappedListener); + * client.getConnectionStateListenable().removeListener(decoratedListener); * </pre></code> * </p> */ diff --git a/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java b/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java index 77ec20f..e2daa96 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java @@ -1,7 +1,9 @@ package org.apache.curator.framework.state; +import org.apache.curator.retry.RetryForever; import org.apache.curator.retry.RetryNTimes; import org.testng.Assert; +import org.testng.annotations.AfterClass; import org.testng.annotations.Test; import java.time.Duration; import java.time.temporal.ChronoUnit; @@ -12,23 +14,30 @@ import java.util.concurrent.atomic.AtomicInteger; public class TestCircuitBreaker { + private Duration[] lastDelay = new Duration[]{Duration.ZERO}; + private ScheduledThreadPoolExecutor service = new ScheduledThreadPoolExecutor(1) + { + @Override + public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) + { + lastDelay[0] = Duration.of(unit.toNanos(delay), ChronoUnit.NANOS); + command.run(); + return null; + } + }; + + @AfterClass + public void tearDown() + { + service.shutdownNow(); + } + @Test public void testBasic() { final int retryQty = 1; final Duration delay = Duration.ofSeconds(10); - Duration[] lastDelay = new Duration[]{Duration.ZERO}; - ScheduledThreadPoolExecutor service = new ScheduledThreadPoolExecutor(1) - { - @Override - public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) - { - lastDelay[0] = Duration.of(unit.toNanos(delay), ChronoUnit.NANOS); - command.run(); - return null; - } - }; CircuitBreaker circuitBreaker = new CircuitBreaker(new RetryNTimes(retryQty, (int)delay.toMillis()), service); AtomicInteger counter = new AtomicInteger(0); @@ -46,4 +55,15 @@ public class TestCircuitBreaker Assert.assertEquals(circuitBreaker.getRetryCount(), 0); Assert.assertFalse(circuitBreaker.close()); } + + @Test + public void testVariousOpenRetryFails() + { + CircuitBreaker circuitBreaker = new CircuitBreaker(new RetryForever(1), service); + Assert.assertFalse(circuitBreaker.tryToRetry(() -> {})); + Assert.assertTrue(circuitBreaker.tryToOpen(() -> {})); + Assert.assertFalse(circuitBreaker.tryToOpen(() -> {})); + Assert.assertTrue(circuitBreaker.close()); + Assert.assertFalse(circuitBreaker.close()); + } } diff --git a/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreakingConnectionStateListener.java b/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreakingConnectionStateListener.java index 36a1954..1712eed 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreakingConnectionStateListener.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreakingConnectionStateListener.java @@ -8,7 +8,8 @@ import org.apache.curator.retry.RetryForever; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.compatibility.Timing2; import org.testng.Assert; -import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -20,7 +21,7 @@ public class TestCircuitBreakingConnectionStateListener private final CuratorFramework dummyClient = CuratorFrameworkFactory.newClient("foo", new RetryOneTime(1)); private final Timing2 timing = new Timing2(); private final Timing2 retryTiming = timing.multiple(.25); - private final ScheduledThreadPoolExecutor service = new ScheduledThreadPoolExecutor(1); + private volatile ScheduledThreadPoolExecutor service; private static class RecordingListener implements ConnectionStateListener { @@ -49,7 +50,13 @@ public class TestCircuitBreakingConnectionStateListener } } - @AfterClass + @BeforeMethod + public void setup() + { + service = new ScheduledThreadPoolExecutor(1); + } + + @AfterMethod public void tearDown() { service.shutdownNow(); @@ -60,7 +67,7 @@ public class TestCircuitBreakingConnectionStateListener { RecordingListener recordingListener = new RecordingListener(); TestRetryPolicy retryPolicy = new TestRetryPolicy(); - final CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryPolicy, service); + CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryPolicy, service); listener.stateChanged(dummyClient, ConnectionState.RECONNECTED); Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.RECONNECTED); @@ -94,8 +101,11 @@ public class TestCircuitBreakingConnectionStateListener TestRetryPolicy retryPolicy = new TestRetryPolicy(); CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryPolicy, service); - listener.stateChanged(dummyClient, ConnectionState.LOST); - listener.stateChanged(dummyClient, ConnectionState.LOST); // second LOST ignored + synchronized(listener) // don't let retry policy run while we're pushing state changes + { + listener.stateChanged(dummyClient, ConnectionState.LOST); + listener.stateChanged(dummyClient, ConnectionState.LOST); // second LOST ignored + } Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.LOST); Assert.assertTrue(recordingListener.stateChanges.isEmpty()); @@ -112,6 +122,7 @@ public class TestCircuitBreakingConnectionStateListener listener.stateChanged(dummyClient, ConnectionState.LOST); Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.LOST); + Assert.assertFalse(listener.isOpen()); listener.stateChanged(dummyClient, ConnectionState.LOST); Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.LOST); Assert.assertFalse(listener.isOpen()); @@ -122,7 +133,7 @@ public class TestCircuitBreakingConnectionStateListener { RecordingListener recordingListener = new RecordingListener(); RetryPolicy retryOnce = new RetryOneTime(retryTiming.milliseconds()); - final CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryOnce, service); + CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryOnce, service); synchronized(listener) // don't let retry policy run while we're pushing state changes { @@ -134,4 +145,65 @@ public class TestCircuitBreakingConnectionStateListener Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.SUSPENDED); Assert.assertFalse(listener.isOpen()); } + + @Test + public void testSuspendedToLostRatcheting() throws Exception + { + RecordingListener recordingListener = new RecordingListener(); + RetryPolicy retryInfinite = new RetryForever(Integer.MAX_VALUE); + CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryInfinite, service); + + listener.stateChanged(dummyClient, ConnectionState.RECONNECTED); + Assert.assertFalse(listener.isOpen()); + Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.RECONNECTED); + + listener.stateChanged(dummyClient, ConnectionState.SUSPENDED); + Assert.assertTrue(listener.isOpen()); + Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.SUSPENDED); + + listener.stateChanged(dummyClient, ConnectionState.RECONNECTED); + listener.stateChanged(dummyClient, ConnectionState.READ_ONLY); + listener.stateChanged(dummyClient, ConnectionState.SUSPENDED); + listener.stateChanged(dummyClient, ConnectionState.SUSPENDED); + listener.stateChanged(dummyClient, ConnectionState.SUSPENDED); + listener.stateChanged(dummyClient, ConnectionState.SUSPENDED); + listener.stateChanged(dummyClient, ConnectionState.RECONNECTED); + listener.stateChanged(dummyClient, ConnectionState.READ_ONLY); + listener.stateChanged(dummyClient, ConnectionState.SUSPENDED); + Assert.assertTrue(recordingListener.stateChanges.isEmpty()); + Assert.assertTrue(listener.isOpen()); + + listener.stateChanged(dummyClient, ConnectionState.LOST); + Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.LOST); + Assert.assertTrue(listener.isOpen()); + + listener.stateChanged(dummyClient, ConnectionState.RECONNECTED); + listener.stateChanged(dummyClient, ConnectionState.READ_ONLY); + listener.stateChanged(dummyClient, ConnectionState.SUSPENDED); + listener.stateChanged(dummyClient, ConnectionState.SUSPENDED); + listener.stateChanged(dummyClient, ConnectionState.SUSPENDED); + listener.stateChanged(dummyClient, ConnectionState.SUSPENDED); + listener.stateChanged(dummyClient, ConnectionState.RECONNECTED); + listener.stateChanged(dummyClient, ConnectionState.READ_ONLY); + listener.stateChanged(dummyClient, ConnectionState.SUSPENDED); + Assert.assertTrue(recordingListener.stateChanges.isEmpty()); + Assert.assertTrue(listener.isOpen()); + + listener.stateChanged(dummyClient, ConnectionState.RECONNECTED); + listener.stateChanged(dummyClient, ConnectionState.READ_ONLY); + listener.stateChanged(dummyClient, ConnectionState.SUSPENDED); + listener.stateChanged(dummyClient, ConnectionState.SUSPENDED); + listener.stateChanged(dummyClient, ConnectionState.SUSPENDED); + listener.stateChanged(dummyClient, ConnectionState.SUSPENDED); + listener.stateChanged(dummyClient, ConnectionState.LOST); + listener.stateChanged(dummyClient, ConnectionState.LOST); + listener.stateChanged(dummyClient, ConnectionState.LOST); + listener.stateChanged(dummyClient, ConnectionState.RECONNECTED); + listener.stateChanged(dummyClient, ConnectionState.READ_ONLY); + listener.stateChanged(dummyClient, ConnectionState.LOST); + listener.stateChanged(dummyClient, ConnectionState.SUSPENDED); + listener.stateChanged(dummyClient, ConnectionState.LOST); + Assert.assertTrue(recordingListener.stateChanges.isEmpty()); + Assert.assertTrue(listener.isOpen()); + } } diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index 439f6c8..bf2abd4 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -64,18 +64,18 @@ public class TestLeaderLatch extends BaseClassForTests public void testWithCircuitBreaker() throws Exception { Timing2 timing = new Timing2(); - ConnectionStateListenerDecorator factory = ConnectionStateListenerDecorator.circuitBreaking(new RetryForever(timing.milliseconds())); + ConnectionStateListenerDecorator decorator = ConnectionStateListenerDecorator.circuitBreaking(new RetryForever(timing.multiple(2).milliseconds())); try ( CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(server.getConnectString()) .retryPolicy(new RetryOneTime(1)) - .connectionStateListenerFactory(factory) + .connectionStateListenerDecorator(decorator) .connectionTimeoutMs(timing.connection()) .sessionTimeoutMs(timing.session()) .build() ) { client.start(); AtomicInteger resetCount = new AtomicInteger(0); - LeaderLatch latch = new LeaderLatch(client, "/foo/bar") + try ( LeaderLatch latch = new LeaderLatch(client, "/foo/bar") { @Override void reset() throws Exception @@ -83,18 +83,20 @@ public class TestLeaderLatch extends BaseClassForTests resetCount.incrementAndGet(); super.reset(); } - }; - latch.start(); - Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); - - for ( int i = 0; i < 5; ++i ) + } ) { - server.stop(); - server.restart(); - timing.sleepABit(); + latch.start(); + Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); + + for ( int i = 0; i < 5; ++i ) + { + server.stop(); + server.restart(); + timing.sleepABit(); + } + Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); + Assert.assertEquals(resetCount.get(), 2); } - Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); - Assert.assertEquals(resetCount.get(), 2); } } diff --git a/src/site/confluence/errors.confluence b/src/site/confluence/errors.confluence index b4f6643..97f23fd 100644 --- a/src/site/confluence/errors.confluence +++ b/src/site/confluence/errors.confluence @@ -19,7 +19,8 @@ in a retry mechanism. Thus, the following guarantees can be made: h2. Notifications Curator exposes several listenable interfaces for clients to monitor the state of the ZooKeeper connection. -{{ConnectionStateListener}} is called when there are connection disruptions. Clients can monitor these changes and take +{{ConnectionStateListener}} (note see [[Utilities|utilities.html]] for details on properly decorating listeners) is called when there are connection +disruptions. Clients can monitor these changes and take appropriate action. These are the possible state changes: |CONNECTED|Sent for the first successful connection to the server. NOTE: You will only get one of these messages for any CuratorFramework instance.| diff --git a/src/site/confluence/utilities.confluence b/src/site/confluence/utilities.confluence index 3a62fa5..1971c3c 100644 --- a/src/site/confluence/utilities.confluence +++ b/src/site/confluence/utilities.confluence @@ -14,6 +14,50 @@ Various static methods to help with using ZooKeeper ZNode paths: * getSortedChildren: Return the children of the given path sorted by sequence number * makePath: Given a parent path and a child node, create a combined full path +h2. Circuit Breaking ConnectionStateListener + +During network outages ZooKeeper can become very noisy sending connection/disconnection events in rapid succession. Curator recipes respond to these +messages by resetting state, etc. E.g. LeaderLatch must delete its lock node and try to recreate it in order to try to re\-obtain leadership, etc. + +This noisy herding can be avoided by using the circuit breaking listener decorator. When it receives ConnectionState.SUSPENDED, the circuit becomes "open" +(based on the provided RetryPolicy) and will ignore future connection state changes until RetryPolicy timeout has elapsed. Note: however, if the connection +goes from ConnectionState.SUSPENDED to ConnectionState.LOST the first LOST state is sent. + +When the circuit decorator is closed, all connection state changes are forwarded to the managed listener. When the first disconnected state is received, the +circuit becomes open. The state change that caused the circuit to open is sent to the managed listener and the RetryPolicy will be used to get a delay amount. +While the delay is active, the decorator will store state changes but will not forward them to the managed listener (except, however, the first time the state +changes from SUSPENDED to LOST). When the delay elapses, if the connection has been restored, the circuit closes and forwards the new state to the managed listener. +If the connection has not been restored, the RetryPolicy is checked again. If the RetryPolicy indicates another retry is allowed the process repeats. If, however, +the RetryPolicy indicates that retries are exhausted then the circuit closes \- if the current state is different than the state that caused the circuit to open it is +forwarded to the managed listener. + +You can enable the Circuit Breaking ConnectionStateListener during creation of your CuratorFramework instance. All Curator recipes will decorate +their ConnectionStateListeners using the configured decorator. E.g. + +{code} +ConnectionStateListenerDecorator decorator = ConnectionStateListenerDecorator.circuitBreaking(...); +CuratorFramework client = CuratorFrameworkFactory.builder() + ... + .connectionStateListenerDecorator(decorator) + ... + .build(); +{code} + +If you are setting a ConnectionStateListener you should always "decorate" it by calling {{decorateConnectionStateListener()}}. + +{code} +CuratorFramework client ... +ConnectionStateListener listener = ... +ConnectionStateListener decoratedListener = client.decorateConnectionStateListener(listener); + +... + +client.getConnectionStateListenable().addListener(decoratedListener); + +// later, to remove... +client.getConnectionStateListenable().removeListener(decoratedListener); +{code} + h2. Locker Curator's Locker uses Java 7's try\-with\-resources feature to making using Curator locks safer:
