Repository: curator Updated Branches: refs/heads/CURATOR-126 [created] 785e9f6c8
CURATOR-126: Fix race condition in CuratorFrameworkImpl.close() Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/785e9f6c Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/785e9f6c Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/785e9f6c Branch: refs/heads/CURATOR-126 Commit: 785e9f6c8a528d0c07652450471dcb71a5717776 Parents: 15a0aac Author: Scott Blum <[email protected]> Authored: Mon Jul 28 14:10:37 2014 -0400 Committer: Scott Blum <[email protected]> Committed: Mon Jul 28 17:13:17 2014 -0400 ---------------------------------------------------------------------- .../framework/CuratorFrameworkFactory.java | 17 +++++++++++++++ .../framework/imps/CuratorFrameworkImpl.java | 22 +++++++++++++++----- 2 files changed, 34 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/785e9f6c/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java index 2d21fb7..8ef2580 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java @@ -52,6 +52,7 @@ public class CuratorFrameworkFactory private static final DefaultZookeeperFactory DEFAULT_ZOOKEEPER_FACTORY = new DefaultZookeeperFactory(); private static final DefaultACLProvider DEFAULT_ACL_PROVIDER = new DefaultACLProvider(); private static final long DEFAULT_INACTIVE_THRESHOLD_MS = (int)TimeUnit.MINUTES.toMillis(3); + private static final int DEFAULT_CLOSE_WAIT_MS = (int)TimeUnit.SECONDS.toMillis(1); /** * Return a new builder that builds a CuratorFramework @@ -101,6 +102,7 @@ public class CuratorFrameworkFactory private EnsembleProvider ensembleProvider; private int sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT_MS; private int connectionTimeoutMs = DEFAULT_CONNECTION_TIMEOUT_MS; + private int maxCloseWaitMs = DEFAULT_CLOSE_WAIT_MS; private RetryPolicy retryPolicy; private ThreadFactory threadFactory = null; private String namespace; @@ -239,6 +241,16 @@ public class CuratorFrameworkFactory } /** + * @param maxCloseWaitMs time to wait during close to join background threads + * @return this + */ + public Builder maxCloseWaitMs(int maxCloseWaitMs) + { + this.maxCloseWaitMs = maxCloseWaitMs; + return this; + } + + /** * @param retryPolicy retry policy to use * @return this */ @@ -336,6 +348,11 @@ public class CuratorFrameworkFactory return connectionTimeoutMs; } + public int getMaxCloseWaitMs() + { + return maxCloseWaitMs; + } + public RetryPolicy getRetryPolicy() { return retryPolicy; http://git-wip-us.apache.org/repos/asf/curator/blob/785e9f6c/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index 23a3248..7f7cc98 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -63,6 +63,7 @@ public class CuratorFrameworkImpl implements CuratorFramework private final ListenerContainer<CuratorListener> listeners; private final ListenerContainer<UnhandledErrorListener> unhandledErrorListeners; private final ThreadFactory threadFactory; + private final int maxCloseWaitMs; private final BlockingQueue<OperationAndData<?>> backgroundOperations; private final NamespaceImpl namespace; private final ConnectionStateManager connectionStateManager; @@ -127,6 +128,7 @@ public class CuratorFrameworkImpl implements CuratorFramework backgroundOperations = new DelayQueue<OperationAndData<?>>(); namespace = new NamespaceImpl(this, builder.getNamespace()); threadFactory = getThreadFactory(builder); + maxCloseWaitMs = builder.getMaxCloseWaitMs(); connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory()); compressionProvider = builder.getCompressionProvider(); aclProvider = builder.getAclProvider(); @@ -179,6 +181,7 @@ public class CuratorFrameworkImpl implements CuratorFramework listeners = parent.listeners; unhandledErrorListeners = parent.unhandledErrorListeners; threadFactory = parent.threadFactory; + maxCloseWaitMs = parent.maxCloseWaitMs; backgroundOperations = parent.backgroundOperations; connectionStateManager = parent.connectionStateManager; defaultData = parent.defaultData; @@ -297,15 +300,24 @@ public class CuratorFrameworkImpl implements CuratorFramework } }); + if ( executorService != null ) + { + executorService.shutdownNow(); + try + { + executorService.awaitTermination(maxCloseWaitMs, TimeUnit.MILLISECONDS); + } + catch ( InterruptedException e ) + { + // Interrupted while interrupting; I give up. + Thread.currentThread().interrupt(); + } + } listeners.clear(); unhandledErrorListeners.clear(); connectionStateManager.close(); client.close(); namespaceWatcherMap.close(); - if ( executorService != null ) - { - executorService.shutdownNow(); - } } } @@ -759,7 +771,7 @@ public class CuratorFrameworkImpl implements CuratorFramework private void backgroundOperationsLoop() { - while ( !Thread.interrupted() ) + while ( !Thread.currentThread().isInterrupted() ) { OperationAndData<?> operationAndData; try
