This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch CURATOR-505 in repository https://gitbox.apache.org/repos/asf/curator.git
commit a162f5b4606277e87f93b93e47c32237490846e1 Author: randgalt <[email protected]> AuthorDate: Thu Feb 7 12:58:41 2019 -0500 CURATOR-505 - decoration of ConnectionStateListeners is now automatic (a backdoor is provided) --- .../apache/curator/framework/CuratorFramework.java | 20 ---- .../framework/imps/CuratorFrameworkImpl.java | 29 ++---- .../curator/framework/imps/EnsembleTracker.java | 6 ++ .../framework/listen/MappingListenerContainer.java | 112 +++++++++++++++++++++ .../framework/state/ConnectionStateListener.java | 15 ++- .../framework/state/ConnectionStateManager.java | 35 ++++--- .../curator/framework/recipes/cache/NodeCache.java | 56 +++++------ .../framework/recipes/cache/PathChildrenCache.java | 10 +- .../curator/framework/recipes/cache/TreeCache.java | 11 +- .../framework/recipes/leader/LeaderLatch.java | 11 +- .../framework/recipes/leader/LeaderSelector.java | 8 +- .../framework/recipes/nodes/PersistentNode.java | 21 ++-- .../framework/recipes/shared/SharedValue.java | 40 ++++---- .../x/discovery/details/ServiceCacheImpl.java | 109 ++++++++++---------- .../x/discovery/details/ServiceDiscoveryImpl.java | 45 +++++---- src/site/confluence/errors.confluence | 3 +- src/site/confluence/utilities.confluence | 18 +--- 17 files changed, 321 insertions(+), 228 deletions(-) diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java index 2657781..3737faa 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java @@ -30,7 +30,6 @@ import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.schema.SchemaSet; import org.apache.curator.framework.state.ConnectionStateErrorPolicy; import org.apache.curator.framework.state.ConnectionStateListener; -import org.apache.curator.framework.state.ConnectionStateListenerDecorator; import org.apache.curator.utils.EnsurePath; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; @@ -358,23 +357,4 @@ public interface CuratorFramework extends Closeable * @since 4.1.0 */ CompletableFuture<Void> runSafe(Runnable runnable); - - /** - * Uses the configured {@link org.apache.curator.framework.state.ConnectionStateListenerDecorator} - * to decorate the given listener. You should always decorate connection state listeners via - * this method. See the Curator recipes for examples. - * - * @param actual listener to decorate - * @return decorated listener - */ - ConnectionStateListener decorateConnectionStateListener(ConnectionStateListener actual); - - /** - * Returns a facade of the current instance that uses the given connection state listener - * decorator instead of the configured one - * - * @param newDecorator decorator to use - * @return facade - */ - CuratorFramework usingConnectionStateListenerDecorator(ConnectionStateListenerDecorator newDecorator); } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index f210021..d9c3424 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -41,7 +41,6 @@ import org.apache.curator.framework.schema.SchemaSet; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateErrorPolicy; import org.apache.curator.framework.state.ConnectionStateListener; -import org.apache.curator.framework.state.ConnectionStateListenerDecorator; import org.apache.curator.framework.state.ConnectionStateManager; import org.apache.curator.utils.DebugUtils; import org.apache.curator.utils.EnsurePath; @@ -91,7 +90,6 @@ public class CuratorFrameworkImpl implements CuratorFramework private final SchemaSet schemaSet; private final boolean zk34CompatibilityMode; private final Executor runSafeService; - private final ConnectionStateListenerDecorator connectionStateListenerDecorator; private volatile ExecutorService executorService; private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false); @@ -141,7 +139,7 @@ public class CuratorFrameworkImpl implements CuratorFramework namespace = new NamespaceImpl(this, builder.getNamespace()); threadFactory = getThreadFactory(builder); maxCloseWaitMs = builder.getMaxCloseWaitMs(); - connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent()); + connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerDecorator()); compressionProvider = builder.getCompressionProvider(); aclProvider = builder.getAclProvider(); state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT); @@ -149,7 +147,6 @@ public class CuratorFrameworkImpl implements CuratorFramework connectionStateErrorPolicy = Preconditions.checkNotNull(builder.getConnectionStateErrorPolicy(), "errorPolicy cannot be null"); schemaSet = Preconditions.checkNotNull(builder.getSchemaSet(), "schemaSet cannot be null"); zk34CompatibilityMode = builder.isZk34CompatibilityMode(); - connectionStateListenerDecorator = builder.getConnectionStateListenerDecorator(); byte[] builderDefaultData = builder.getDefaultData(); defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0]; @@ -236,11 +233,6 @@ public class CuratorFrameworkImpl implements CuratorFramework protected CuratorFrameworkImpl(CuratorFrameworkImpl parent) { - this(parent, parent.connectionStateListenerDecorator); - } - - private CuratorFrameworkImpl(CuratorFrameworkImpl parent, ConnectionStateListenerDecorator connectionStateListenerDecorator) - { client = parent.client; listeners = parent.listeners; unhandledErrorListeners = parent.unhandledErrorListeners; @@ -265,7 +257,6 @@ public class CuratorFrameworkImpl implements CuratorFramework zk34CompatibilityMode = parent.zk34CompatibilityMode; ensembleTracker = null; runSafeService = parent.runSafeService; - this.connectionStateListenerDecorator = connectionStateListenerDecorator; } @Override @@ -334,6 +325,12 @@ public class CuratorFrameworkImpl implements CuratorFramework logAsErrorConnectionErrors.set(true); } } + + @Override + public boolean doNotDecorate() + { + return true; + } }; this.getConnectionStateListenable().addListener(listener); @@ -598,18 +595,6 @@ public class CuratorFrameworkImpl implements CuratorFramework return schemaSet; } - @Override - public ConnectionStateListener decorateConnectionStateListener(ConnectionStateListener actual) - { - return connectionStateListenerDecorator.decorateListener(this, actual); - } - - @Override - public CuratorFramework usingConnectionStateListenerDecorator(ConnectionStateListenerDecorator newDecorator) - { - return new CuratorFrameworkImpl(this, newDecorator); - } - ACLProvider getAclProvider() { return aclProvider; diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java index 7d8fe19..8ca63f6 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java @@ -72,6 +72,12 @@ public class EnsembleTracker implements Closeable, CuratorWatcher } } } + + @Override + public boolean doNotDecorate() + { + return true; + } }; private enum State diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerContainer.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerContainer.java new file mode 100644 index 0000000..3a37ecb --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerContainer.java @@ -0,0 +1,112 @@ +package org.apache.curator.framework.listen; + +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.curator.utils.ThreadUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * Upgraded version of {@link org.apache.curator.framework.listen.ListenerContainer} that + * doesn't leak Guava's internals and also supports mapping/wrapping of listeners + */ +public class MappingListenerContainer<K, V> implements Listenable<K> +{ + private final Logger log = LoggerFactory.getLogger(getClass()); + private final Map<K, ListenerEntry<V>> listeners = Maps.newConcurrentMap(); + private final Function<K, V> mapper; + + /** + * Returns a new standard version that does no mapping + * + * @return new container + */ + public static <T> MappingListenerContainer<T, T> nonMapping() + { + return new MappingListenerContainer<>(Function.identity()); + } + + /** + * Returns a new container that wraps listeners using the given mapper + * + * @param mapper listener mapper/wrapper + * @return new container + */ + public static <K, V> MappingListenerContainer<K, V> mapping(Function<K, V> mapper) + { + return new MappingListenerContainer<>(mapper); + } + + @Override + public void addListener(K listener) + { + addListener(listener, MoreExecutors.directExecutor()); + } + + @Override + public void addListener(K listener, Executor executor) + { + V mapped = mapper.apply(listener); + listeners.put(listener, new ListenerEntry<V>(mapped, executor)); + } + + @Override + public void removeListener(K listener) + { + if ( listener != null ) + { + listeners.remove(listener); + } + } + + /** + * Remove all listeners + */ + public void clear() + { + listeners.clear(); + } + + /** + * Return the number of listeners + * + * @return number + */ + public int size() + { + return listeners.size(); + } + + /** + * Utility - apply the given function to each listener. The function receives + * the listener as an argument. + * + * @param function function to call for each listener + */ + public void forEach(Consumer<V> function) + { + for ( ListenerEntry<V> entry : listeners.values() ) + { + entry.executor.execute(() -> { + try + { + function.accept(entry.listener); + } + catch ( Throwable e ) + { + ThreadUtils.checkInterrupted(e); + log.error(String.format("Listener (%s) threw an exception", entry.listener), e); + } + }); + } + } + + private MappingListenerContainer(Function<K, V> mapper) + { + this.mapper = mapper; + } +} diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListener.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListener.java index 075e6ec..71635d0 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListener.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListener.java @@ -28,5 +28,18 @@ public interface ConnectionStateListener * @param client the client * @param newState the new state */ - public void stateChanged(CuratorFramework client, ConnectionState newState); + void stateChanged(CuratorFramework client, ConnectionState newState); + + /** + * Normally, ConnectionStateListeners are decorated via the configured + * {@link org.apache.curator.framework.state.ConnectionStateListenerDecorator}. For certain + * critical cases, however, this is not desired. If your listener returns <code>true</code> + * for doNotDecorate(), it will not be passed through the decorator. + * + * @return true/false + */ + default boolean doNotDecorate() + { + return false; + } } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java index 5e28b3d..3654f61 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java @@ -19,10 +19,10 @@ package org.apache.curator.framework.state; -import com.google.common.base.Function; import com.google.common.base.Preconditions; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.listen.ListenerContainer; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.framework.listen.MappingListenerContainer; import org.apache.curator.utils.Compatibility; import org.apache.curator.utils.ThreadUtils; import org.slf4j.Logger; @@ -68,10 +68,10 @@ public class ConnectionStateManager implements Closeable private final CuratorFramework client; private final int sessionTimeoutMs; private final int sessionExpirationPercent; - private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>(); private final AtomicBoolean initialConnectMessageSent = new AtomicBoolean(false); private final ExecutorService service; private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT); + private final MappingListenerContainer<ConnectionStateListener, ConnectionStateListener> listeners; // guarded by sync private ConnectionState currentConnectionState; @@ -93,6 +93,18 @@ public class ConnectionStateManager implements Closeable */ public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, int sessionExpirationPercent) { + this(client, threadFactory, sessionTimeoutMs, sessionExpirationPercent, ConnectionStateListenerDecorator.standard); + } + + /** + * @param client the client + * @param threadFactory thread factory to use or null for a default + * @param sessionTimeoutMs the ZK session timeout in milliseconds + * @param sessionExpirationPercent percentage of negotiated session timeout to use when simulating a session timeout. 0 means don't simulate at all + * @param connectionStateListenerDecorator the decorator to use + */ + public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, int sessionExpirationPercent, ConnectionStateListenerDecorator connectionStateListenerDecorator) + { this.client = client; this.sessionTimeoutMs = sessionTimeoutMs; this.sessionExpirationPercent = sessionExpirationPercent; @@ -101,6 +113,7 @@ public class ConnectionStateManager implements Closeable threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager"); } service = Executors.newSingleThreadExecutor(threadFactory); + listeners = MappingListenerContainer.mapping(listener -> listener.doNotDecorate() ? listener : connectionStateListenerDecorator.decorateListener(client, listener)); } /** @@ -138,8 +151,9 @@ public class ConnectionStateManager implements Closeable * Return the listenable * * @return listenable + * @since 4.2.0 return type has changed from ListenerContainer to Listenable */ - public ListenerContainer<ConnectionStateListener> getListenable() + public Listenable<ConnectionStateListener> getListenable() { return listeners; } @@ -263,18 +277,7 @@ public class ConnectionStateManager implements Closeable log.warn("There are no ConnectionStateListeners registered."); } - listeners.forEach - ( - new Function<ConnectionStateListener, Void>() - { - @Override - public Void apply(ConnectionStateListener listener) - { - listener.stateChanged(client, newState); - return null; - } - } - ); + listeners.forEach(listener -> listener.stateChanged(client, newState)); } else if ( sessionExpirationPercent > 0 ) { diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java index 1ba88c3..9687e1b 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java @@ -64,7 +64,32 @@ public class NodeCache implements Closeable private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT); private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>(); private final AtomicBoolean isConnected = new AtomicBoolean(true); - private volatile ConnectionStateListener connectionStateListener; + private ConnectionStateListener connectionStateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) ) + { + if ( isConnected.compareAndSet(false, true) ) + { + try + { + reset(); + } + catch ( Exception e ) + { + ThreadUtils.checkInterrupted(e); + log.error("Trying to reset after reconnection", e); + } + } + } + else + { + isConnected.set(false); + } + } + }; private Watcher watcher = new Watcher() { @@ -118,8 +143,6 @@ public class NodeCache implements Closeable this.client = client.newWatcherRemoveCuratorFramework(); this.path = PathUtils.validatePath(path); this.dataIsCompressed = dataIsCompressed; - - connectionStateListener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState)); } public CuratorFramework getClient() @@ -173,7 +196,7 @@ public class NodeCache implements Closeable // has something to do with Guava's cache and circular references connectionStateListener = null; watcher = null; - } + } } /** @@ -325,7 +348,7 @@ public class NodeCache implements Closeable } } } - + /** * Default behavior is just to log the exception * @@ -335,27 +358,4 @@ public class NodeCache implements Closeable { log.error("", e); } - - private void handleStateChange(ConnectionState newState) - { - if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) ) - { - if ( isConnected.compareAndSet(false, true) ) - { - try - { - reset(); - } - catch ( Exception e ) - { - ThreadUtils.checkInterrupted(e); - log.error("Trying to reset after reconnection", e); - } - } - } - else - { - isConnected.set(false); - } - } } diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java index 14608ba..bdc73cc 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java @@ -128,7 +128,14 @@ public class PathChildrenCache implements Closeable @VisibleForTesting volatile Exchanger<Object> rebuildTestExchanger; - private volatile ConnectionStateListener connectionStateListener; + private volatile ConnectionStateListener connectionStateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + handleStateChange(newState); + } + }; public static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("PathChildrenCache"); /** @@ -218,7 +225,6 @@ public class PathChildrenCache implements Closeable this.dataIsCompressed = dataIsCompressed; this.executorService = executorService; ensureContainers = new EnsureContainers(client, path); - connectionStateListener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState)); } /** diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java index a3b1d23..f42c1d5 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java @@ -534,7 +534,15 @@ public class TreeCache implements Closeable private final ListenerContainer<TreeCacheListener> listeners = new ListenerContainer<TreeCacheListener>(); private final ListenerContainer<UnhandledErrorListener> errorListeners = new ListenerContainer<UnhandledErrorListener>(); private final AtomicReference<TreeState> treeState = new AtomicReference<TreeState>(TreeState.LATENT); - private final ConnectionStateListener connectionStateListener; + + private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + handleStateChange(newState); + } + }; static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("TreeCache"); @@ -578,7 +586,6 @@ public class TreeCache implements Closeable this.maxDepth = maxDepth; this.disableZkWatches = disableZkWatches; this.executorService = Preconditions.checkNotNull(executorService, "executorService cannot be null"); - connectionStateListener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState)); } /** diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index 22cf3af..bb8aa73 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -74,7 +74,15 @@ public class LeaderLatch implements Closeable private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>(); private final CloseMode closeMode; private final AtomicReference<Future<?>> startTask = new AtomicReference<Future<?>>(); - private final ConnectionStateListener listener; + + private final ConnectionStateListener listener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + handleStateChange(newState); + } + }; private static final String LOCK_NAME = "latch-"; @@ -141,7 +149,6 @@ public class LeaderLatch implements Closeable this.latchPath = PathUtils.validatePath(latchPath); this.id = Preconditions.checkNotNull(id, "id cannot be null"); this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null"); - listener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState)); } /** diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java index 108d66e..0bb448a 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java @@ -26,7 +26,6 @@ import com.google.common.collect.Lists; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.utils.CloseableExecutorService; import org.apache.curator.utils.ThreadUtils; import org.apache.zookeeper.KeeperException; @@ -69,7 +68,6 @@ public class LeaderSelector implements Closeable private final Logger log = LoggerFactory.getLogger(getClass()); private final CuratorFramework client; private final LeaderSelectorListener listener; - private final ConnectionStateListener connectionStateListener; private final CloseableExecutorService executorService; private final InterProcessMutex mutex; private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT); @@ -147,8 +145,6 @@ public class LeaderSelector implements Closeable this.listener = new WrappedListener(this, listener); hasLeadership = false; - connectionStateListener = client.decorateConnectionStateListener(listener); - this.executorService = executorService; mutex = new InterProcessMutex(client, leaderPath) { @@ -219,7 +215,7 @@ public class LeaderSelector implements Closeable Preconditions.checkState(!executorService.isShutdown(), "Already started"); Preconditions.checkState(!hasLeadership, "Already has leadership"); - client.getConnectionStateListenable().addListener(connectionStateListener); + client.getConnectionStateListenable().addListener(listener); requeue(); } @@ -275,7 +271,7 @@ public class LeaderSelector implements Closeable { Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started"); - client.getConnectionStateListenable().removeListener(connectionStateListener); + client.getConnectionStateListenable().removeListener(listener); executorService.close(); ourTask.set(null); } diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java index 293f46e..81e8dd9 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java @@ -145,7 +145,17 @@ public class PersistentNode implements Closeable } } }; - private final ConnectionStateListener connectionStateListener; + private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework dummy, ConnectionState newState) + { + if ( (newState == ConnectionState.RECONNECTED) && isActive() ) + { + createNode(); + } + } + }; @VisibleForTesting volatile CountDownLatch debugCreateNodeLatch = null; @@ -203,7 +213,6 @@ public class PersistentNode implements Closeable }; this.data.set(Arrays.copyOf(data, data.length)); - connectionStateListener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState)); } private void processBackgroundCallbackClosedState(CuratorEvent event) @@ -545,12 +554,4 @@ public class PersistentNode implements Closeable { return authFailure.get(); } - - private void handleStateChange(ConnectionState newState) - { - if ( (newState == ConnectionState.RECONNECTED) && isActive() ) - { - createNode(); - } - } } diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java index 5f3e183..5d7abce 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java @@ -73,7 +73,26 @@ public class SharedValue implements Closeable, SharedValueReader } }; - private final ConnectionStateListener connectionStateListener; + private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + notifyListenerOfStateChanged(newState); + if ( newState.isConnected() ) + { + try + { + readValueAndNotifyListenersInBackground(); + } + catch ( Exception e ) + { + ThreadUtils.checkInterrupted(e); + log.error("Could not read value after reconnect", e); + } + } + } + }; private enum State { @@ -94,7 +113,6 @@ public class SharedValue implements Closeable, SharedValueReader this.seedValue = Arrays.copyOf(seedValue, seedValue.length); this.watcher = new SharedValueCuratorWatcher(); currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length))); - connectionStateListener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState)); } @VisibleForTesting @@ -106,7 +124,6 @@ public class SharedValue implements Closeable, SharedValueReader // inject watcher for testing this.watcher = watcher; currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length))); - connectionStateListener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState)); } @Override @@ -317,21 +334,4 @@ public class SharedValue implements Closeable, SharedValueReader } ); } - - private void handleStateChange(ConnectionState newState) - { - notifyListenerOfStateChanged(newState); - if ( newState.isConnected() ) - { - try - { - readValueAndNotifyListenersInBackground(); - } - catch ( Exception e ) - { - ThreadUtils.checkInterrupted(e); - log.error("Could not read value after reconnect", e); - } - } - } } diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java index 4270116..d1a31ad 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.curator.x.discovery.details; import com.google.common.annotations.VisibleForTesting; @@ -24,15 +23,14 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.curator.utils.CloseableExecutorService; +import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.listen.ListenerContainer; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; -import org.apache.curator.framework.state.ConnectionStateListener; -import org.apache.curator.utils.CloseableExecutorService; -import org.apache.curator.utils.CloseableUtils; import org.apache.curator.utils.ZKPaths; import org.apache.curator.x.discovery.ServiceCache; import org.apache.curator.x.discovery.ServiceInstance; @@ -47,16 +45,17 @@ import java.util.concurrent.atomic.AtomicReference; public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheListener { - private final ListenerContainer<ServiceCacheListener> listenerContainer = new ListenerContainer<ServiceCacheListener>(); - private final ServiceDiscoveryImpl<T> discovery; - private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT); - private final PathChildrenCache cache; - private final ConcurrentMap<String, ServiceInstance<T>> instances = Maps.newConcurrentMap(); - private final ConcurrentMap<ServiceCacheListener, ConnectionStateListener> connectionStateListeners = Maps.newConcurrentMap(); + private final ListenerContainer<ServiceCacheListener> listenerContainer = new ListenerContainer<ServiceCacheListener>(); + private final ServiceDiscoveryImpl<T> discovery; + private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT); + private final PathChildrenCache cache; + private final ConcurrentMap<String, ServiceInstance<T>> instances = Maps.newConcurrentMap(); private enum State { - LATENT, STARTED, STOPPED + LATENT, + STARTED, + STOPPED } private static CloseableExecutorService convertThreadFactory(ThreadFactory threadFactory) @@ -124,15 +123,18 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi { Preconditions.checkState(state.compareAndSet(State.STARTED, State.STOPPED), "Already closed or has not been started"); - listenerContainer.forEach(new Function<ServiceCacheListener, Void>() - { - @Override - public Void apply(ServiceCacheListener listener) - { - discovery.getClient().getConnectionStateListenable().removeListener(unwrap(listener)); - return null; - } - }); + listenerContainer.forEach + ( + new Function<ServiceCacheListener, Void>() + { + @Override + public Void apply(ServiceCacheListener listener) + { + discovery.getClient().getConnectionStateListenable().removeListener(listener); + return null; + } + } + ); listenerContainer.clear(); CloseableUtils.closeQuietly(cache); @@ -144,56 +146,59 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi public void addListener(ServiceCacheListener listener) { listenerContainer.addListener(listener); - discovery.getClient().getConnectionStateListenable().addListener(wrap(listener)); + discovery.getClient().getConnectionStateListenable().addListener(listener); } @Override public void addListener(ServiceCacheListener listener, Executor executor) { listenerContainer.addListener(listener, executor); - discovery.getClient().getConnectionStateListenable().addListener(wrap(listener), executor); + discovery.getClient().getConnectionStateListenable().addListener(listener, executor); } @Override public void removeListener(ServiceCacheListener listener) { listenerContainer.removeListener(listener); - discovery.getClient().getConnectionStateListenable().removeListener(unwrap(listener)); + discovery.getClient().getConnectionStateListenable().removeListener(listener); } @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { - boolean notifyListeners = false; + boolean notifyListeners = false; switch ( event.getType() ) { - case CHILD_ADDED: - case CHILD_UPDATED: - { - addInstance(event.getData(), false); - notifyListeners = true; - break; - } + case CHILD_ADDED: + case CHILD_UPDATED: + { + addInstance(event.getData(), false); + notifyListeners = true; + break; + } - case CHILD_REMOVED: - { - instances.remove(instanceIdFromData(event.getData())); - notifyListeners = true; - break; - } + case CHILD_REMOVED: + { + instances.remove(instanceIdFromData(event.getData())); + notifyListeners = true; + break; + } } if ( notifyListeners ) { - listenerContainer.forEach(new Function<ServiceCacheListener, Void>() - { - @Override - public Void apply(ServiceCacheListener listener) + listenerContainer.forEach + ( + new Function<ServiceCacheListener, Void>() { - listener.cacheChanged(); - return null; + @Override + public Void apply(ServiceCacheListener listener) + { + listener.cacheChanged(); + return null; + } } - }); + ); } } @@ -204,8 +209,8 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi private void addInstance(ChildData childData, boolean onlyIfAbsent) throws Exception { - String instanceId = instanceIdFromData(childData); - ServiceInstance<T> serviceInstance = discovery.getSerializer().deserialize(childData.getData()); + String instanceId = instanceIdFromData(childData); + ServiceInstance<T> serviceInstance = discovery.getSerializer().deserialize(childData.getData()); if ( onlyIfAbsent ) { instances.putIfAbsent(instanceId, serviceInstance); @@ -216,16 +221,4 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi } cache.clearDataBytes(childData.getPath(), childData.getStat().getVersion()); } - - private ConnectionStateListener wrap(ServiceCacheListener listener) - { - ConnectionStateListener wrapped = discovery.getClient().decorateConnectionStateListener(listener); - connectionStateListeners.put(listener, wrapped); - return wrapped; - } - - private ConnectionStateListener unwrap(ServiceCacheListener listener) - { - return connectionStateListeners.remove(listener); - } } diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java index 2e10095..476705c 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java @@ -65,7 +65,29 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> private final Collection<ServiceCache<T>> caches = Sets.newSetFromMap(Maps.<ServiceCache<T>, Boolean>newConcurrentMap()); private final Collection<ServiceProvider<T>> providers = Sets.newSetFromMap(Maps.<ServiceProvider<T>, Boolean>newConcurrentMap()); private final boolean watchInstances; - private final ConnectionStateListener connectionStateListener; + private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if ( (newState == ConnectionState.RECONNECTED) || (newState == ConnectionState.CONNECTED) ) + { + try + { + log.debug("Re-registering due to reconnection"); + reRegisterServices(); + } + catch (InterruptedException ex) + { + Thread.currentThread().interrupt(); + } + catch ( Exception e ) + { + log.error("Could not re-register instances after reconnection", e); + } + } + } + }; private static class Entry<T> { @@ -97,7 +119,6 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> entry.cache = makeNodeCache(thisInstance); services.put(thisInstance.getId(), entry); } - connectionStateListener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState)); } /** @@ -509,24 +530,4 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> } } } - - private void handleStateChange(ConnectionState newState) - { - if ( (newState == ConnectionState.RECONNECTED) || (newState == ConnectionState.CONNECTED) ) - { - try - { - log.debug("Re-registering due to reconnection"); - reRegisterServices(); - } - catch (InterruptedException ex) - { - Thread.currentThread().interrupt(); - } - catch ( Exception e ) - { - log.error("Could not re-register instances after reconnection", e); - } - } - } } diff --git a/src/site/confluence/errors.confluence b/src/site/confluence/errors.confluence index 97f23fd..b4f6643 100644 --- a/src/site/confluence/errors.confluence +++ b/src/site/confluence/errors.confluence @@ -19,8 +19,7 @@ in a retry mechanism. Thus, the following guarantees can be made: h2. Notifications Curator exposes several listenable interfaces for clients to monitor the state of the ZooKeeper connection. -{{ConnectionStateListener}} (note see [[Utilities|utilities.html]] for details on properly decorating listeners) is called when there are connection -disruptions. Clients can monitor these changes and take +{{ConnectionStateListener}} is called when there are connection disruptions. Clients can monitor these changes and take appropriate action. These are the possible state changes: |CONNECTED|Sent for the first successful connection to the server. NOTE: You will only get one of these messages for any CuratorFramework instance.| diff --git a/src/site/confluence/utilities.confluence b/src/site/confluence/utilities.confluence index 1971c3c..720d8d9 100644 --- a/src/site/confluence/utilities.confluence +++ b/src/site/confluence/utilities.confluence @@ -31,8 +31,7 @@ If the connection has not been restored, the RetryPolicy is checked again. If th the RetryPolicy indicates that retries are exhausted then the circuit closes \- if the current state is different than the state that caused the circuit to open it is forwarded to the managed listener. -You can enable the Circuit Breaking ConnectionStateListener during creation of your CuratorFramework instance. All Curator recipes will decorate -their ConnectionStateListeners using the configured decorator. E.g. +You can enable the Circuit Breaking ConnectionStateListener during creation of your CuratorFramework instance. E.g. {code} ConnectionStateListenerDecorator decorator = ConnectionStateListenerDecorator.circuitBreaking(...); @@ -43,21 +42,6 @@ CuratorFramework client = CuratorFrameworkFactory.builder() .build(); {code} -If you are setting a ConnectionStateListener you should always "decorate" it by calling {{decorateConnectionStateListener()}}. - -{code} -CuratorFramework client ... -ConnectionStateListener listener = ... -ConnectionStateListener decoratedListener = client.decorateConnectionStateListener(listener); - -... - -client.getConnectionStateListenable().addListener(decoratedListener); - -// later, to remove... -client.getConnectionStateListenable().removeListener(decoratedListener); -{code} - h2. Locker Curator's Locker uses Java 7's try\-with\-resources feature to making using Curator locks safer:
