Repository: curator Updated Branches: refs/heads/CURATOR-495 [created] f91adb22e
CURATOR-495 Fixes race in many Curator recipes whereby a pattern was used that called "notifyAll" in a synchronized block in response to a ZooKeeper watcher callback. This created a race and possible deadlock if the recipe instance was already in a synchronized block. This would result in the ZK event thread getting blocked which would prevent ZK connections from getting repaired. This change adds a new executor (available from CuratorFramework) that can be used to do the sync/notify so that ZK's event thread is not blocked. Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f91adb22 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f91adb22 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f91adb22 Branch: refs/heads/CURATOR-495 Commit: f91adb22e83d8e47b99ad98f8c13c86251bc4cb3 Parents: 22c028f Author: randgalt <[email protected]> Authored: Thu Dec 13 20:34:40 2018 -0500 Committer: randgalt <[email protected]> Committed: Thu Dec 13 20:34:40 2018 -0500 ---------------------------------------------------------------------- .../curator/framework/CuratorFramework.java | 35 ++++++++++++++++---- .../framework/CuratorFrameworkFactory.java | 27 ++++++++++++++- .../framework/imps/CuratorFrameworkImpl.java | 25 ++++++++++++++ .../recipes/barriers/DistributedBarrier.java | 7 +--- .../barriers/DistributedDoubleBarrier.java | 13 ++++---- .../recipes/locks/InterProcessSemaphoreV2.java | 14 +++----- .../framework/recipes/locks/LockInternals.java | 11 ++---- 7 files changed, 94 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/f91adb22/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java index bf6167c..02c458a 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java @@ -25,16 +25,14 @@ import org.apache.curator.framework.api.transaction.CuratorMultiTransaction; import org.apache.curator.framework.api.transaction.CuratorOp; import org.apache.curator.framework.api.transaction.CuratorTransaction; import org.apache.curator.framework.api.transaction.TransactionOp; -import org.apache.curator.framework.imps.CuratorFrameworkImpl; import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.schema.SchemaSet; -import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.framework.state.ConnectionStateErrorPolicy; +import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.utils.EnsurePath; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; - import java.io.Closeable; import java.util.concurrent.TimeUnit; @@ -269,7 +267,7 @@ public interface CuratorFramework extends Closeable * Call this method on watchers you are no longer interested in. * * @param watcher the watcher - * + * * @deprecated As of ZooKeeper 3.5 Curators recipes will handle removing watcher references * when they are no longer used. If you write your own recipe, follow the example of Curator * recipes and use {@link #newWatcherRemoveCuratorFramework} calling {@link WatcherRemoveCuratorFramework#removeWatchers()} @@ -277,7 +275,7 @@ public interface CuratorFramework extends Closeable */ @Deprecated public void clearWatcherReferences(Watcher watcher); - + /** * Block until a connection to ZooKeeper is available or the maxWaitTime has been exceeded * @param maxWaitTime The maximum wait time. Specify a value <= 0 to wait indefinitely @@ -286,7 +284,7 @@ public interface CuratorFramework extends Closeable * @throws InterruptedException If interrupted while waiting */ public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException; - + /** * Block until a connection to ZooKeeper is available. This method will not return until a * connection is available or it is interrupted, in which case an InterruptedException will @@ -331,4 +329,29 @@ public interface CuratorFramework extends Closeable * @return true/false */ boolean isZk34CompatibilityMode(); + + /** + * Calls {@link #notifyAll()} on the given object after first synchronizing on it. This is + * done from the {@link #runSafe(Runnable)} thread. + * + * @param monitorHolder object to sync on and notify + * @since 4.1.0 + */ + default void postSafeNotify(Object monitorHolder) + { + runSafe(() -> { + synchronized(monitorHolder) { + monitorHolder.notifyAll(); + } + }); + } + + /** + * Curator (and user) recipes can use this to run notifyAll + * and other blocking calls that might normally block ZooKeeper's event thread. + + * @param runnable proc to call from a safe internal thread + * @since 4.1.0 + */ + void runSafe(Runnable runnable); } http://git-wip-us.apache.org/repos/asf/curator/blob/f91adb22/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java index f3daeab..86fbfce 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java @@ -47,6 +47,7 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.Executor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.apache.curator.CuratorZookeeperClient; @@ -149,6 +150,8 @@ public class CuratorFrameworkFactory private SchemaSet schemaSet = SchemaSet.getDefaultSchemaSet(); private boolean zk34CompatibilityMode = isZK34(); private int waitForShutdownTimeoutMs = 0; + private Executor runSafeService = null; + /** * Apply the current values and build a new CuratorFramework * @@ -189,7 +192,7 @@ public class CuratorFrameworkFactory /** * Add connection authorization - * + * * Subsequent calls to this method overwrite the prior calls. * * @param scheme the scheme @@ -474,6 +477,28 @@ public class CuratorFrameworkFactory return this; } + /** + * Curator (and user) recipes will use this executor to call notifyAll + * and other blocking calls that might normally block ZooKeeper's event thread. + * By default, an executor is allocated internally using the provided (or default) + * {@link #threadFactory(java.util.concurrent.ThreadFactory)}. Use this method + * to set a custom executor. + * + * @param runSafeService executor to use for calls to notifyAll from Watcher callbacks etc + * @return this + * @since 4.1.0 + */ + public Builder runSafeService(Executor runSafeService) + { + this.runSafeService = runSafeService; + return null; + } + + public Executor getRunSafeService() + { + return runSafeService; + } + public ACLProvider getAclProvider() { return aclProvider; http://git-wip-us.apache.org/repos/asf/curator/blob/f91adb22/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index 1ae6a5e..34002a0 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -61,6 +61,7 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.DelayQueue; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -96,6 +97,7 @@ public class CuratorFrameworkImpl implements CuratorFramework private final EnsembleTracker ensembleTracker; private final SchemaSet schemaSet; private final boolean zk34CompatibilityMode; + private final Executor runSafeService; private volatile ExecutorService executorService; private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false); @@ -163,6 +165,22 @@ public class CuratorFrameworkImpl implements CuratorFramework namespaceFacadeCache = new NamespaceFacadeCache(this); ensembleTracker = zk34CompatibilityMode ? null : new EnsembleTracker(this, builder.getEnsembleProvider()); + + runSafeService = makeRunSafeService(builder); + } + + private Executor makeRunSafeService(CuratorFrameworkFactory.Builder builder) + { + if ( builder.getRunSafeService() != null ) + { + return builder.getRunSafeService(); + } + ThreadFactory threadFactory = builder.getThreadFactory(); + if ( threadFactory == null ) + { + threadFactory = ThreadUtils.newThreadFactory("SafeNotifyService"); + } + return Executors.newSingleThreadExecutor(threadFactory); } private List<AuthInfo> buildAuths(CuratorFrameworkFactory.Builder builder) @@ -176,6 +194,12 @@ public class CuratorFrameworkImpl implements CuratorFramework } @Override + public void runSafe(Runnable runnable) + { + runSafeService.execute(runnable); + } + + @Override public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework() { return new WatcherRemovalFacade(this); @@ -240,6 +264,7 @@ public class CuratorFrameworkImpl implements CuratorFramework schemaSet = parent.schemaSet; zk34CompatibilityMode = parent.zk34CompatibilityMode; ensembleTracker = null; + runSafeService = parent.runSafeService; } @Override http://git-wip-us.apache.org/repos/asf/curator/blob/f91adb22/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedBarrier.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedBarrier.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedBarrier.java index 8a376f1..fb00ed9 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedBarrier.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedBarrier.java @@ -44,7 +44,7 @@ public class DistributedBarrier @Override public void process(WatchedEvent event) { - notifyFromWatcher(); + client.postSafeNotify(DistributedBarrier.this); } }; @@ -142,9 +142,4 @@ public class DistributedBarrier } return result; } - - private synchronized void notifyFromWatcher() - { - notifyAll(); - } } http://git-wip-us.apache.org/repos/asf/curator/blob/f91adb22/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedDoubleBarrier.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedDoubleBarrier.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedDoubleBarrier.java index b3bdf2c..2315178 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedDoubleBarrier.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedDoubleBarrier.java @@ -65,7 +65,12 @@ public class DistributedDoubleBarrier public void process(WatchedEvent event) { connectionLost.set(event.getState() != Event.KeeperState.SyncConnected); - notifyFromWatcher(); + client.runSafe(() -> { + synchronized(DistributedDoubleBarrier.this) { + hasBeenNotified.set(true); + DistributedDoubleBarrier.this.notifyAll(); + } + }); } }; @@ -337,10 +342,4 @@ public class DistributedDoubleBarrier return result; } - - private synchronized void notifyFromWatcher() - { - hasBeenNotified.set(true); - notifyAll(); - } } http://git-wip-us.apache.org/repos/asf/curator/blob/f91adb22/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java index 03e1088..6404888 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java @@ -40,7 +40,6 @@ import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.io.IOException; import java.util.Arrays; import java.util.Collection; @@ -88,7 +87,7 @@ public class InterProcessSemaphoreV2 @Override public void process(WatchedEvent event) { - notifyFromWatcher(); + client.postSafeNotify(InterProcessSemaphoreV2.this); } }; @@ -141,7 +140,7 @@ public class InterProcessSemaphoreV2 public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception { InterProcessSemaphoreV2.this.maxLeases = newCount; - notifyFromWatcher(); + client.postSafeNotify(InterProcessSemaphoreV2.this); } @Override @@ -373,7 +372,7 @@ public class InterProcessSemaphoreV2 synchronized(this) { for(;;) - { + { List<String> children; try { @@ -392,7 +391,7 @@ public class InterProcessSemaphoreV2 log.error("Sequential path not found: " + path); return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE; } - + if ( children.size() <= maxLeases ) { break; @@ -479,9 +478,4 @@ public class InterProcessSemaphoreV2 } }; } - - private synchronized void notifyFromWatcher() - { - notifyAll(); - } } http://git-wip-us.apache.org/repos/asf/curator/blob/f91adb22/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java index 46820af..a22bfb1 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java @@ -66,7 +66,7 @@ public class LockInternals @Override public void process(WatchedEvent event) { - notifyFromWatcher(); + client.postSafeNotify(LockInternals.this); } }; @@ -295,7 +295,7 @@ public class LockInternals synchronized(this) { - try + try { // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak client.getData().usingWatcher(watcher).forPath(previousSequencePath); @@ -316,7 +316,7 @@ public class LockInternals wait(); } } - catch ( KeeperException.NoNodeException e ) + catch ( KeeperException.NoNodeException e ) { // it has been deleted (i.e. lock released). Try to acquire again } @@ -351,9 +351,4 @@ public class LockInternals // ignore - already deleted (possibly expired session, etc.) } } - - private synchronized void notifyFromWatcher() - { - notifyAll(); - } }
