This is an automated email from the ASF dual-hosted git repository.
kezhuw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git
The following commit(s) were added to refs/heads/master by this push:
new f18f0acf4 GH-1277: SafeNotifyService threads leak in
CuratorFrameWorkImpl (#1278)
f18f0acf4 is described below
commit f18f0acf432231ece5dcd179020c4951dbe49803
Author: Istvan Fajth <[email protected]>
AuthorDate: Thu Dec 18 05:13:10 2025 +0100
GH-1277: SafeNotifyService threads leak in CuratorFrameWorkImpl (#1278)
CURATOR-495 introduced a new runSafeService field in CuratorFrameworkImpl
class, and this field is
either initialized by an external ExecutorService via the builder, or it is
created internally
within the class.
In the CuratorFrameworkImpl#close method though, this Executor is never
closed, so the threads
that are opened by the instances are lingering there until the VM is closed
by default.
Worse, if someone specifies a thread factory to the framework
implementation via the builder that
produces non-daemon threads, the VM never exits due to the unstopped single
thread executor.
Fixes #1277.
---
.../curator/framework/CuratorFrameworkFactory.java | 3 ++
.../framework/imps/CuratorFrameworkImpl.java | 40 ++++++++++++++---
.../framework/imps/TestFrameworkBackground.java | 50 ++++++++++++++++++++++
3 files changed, 86 insertions(+), 7 deletions(-)
diff --git
a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index 87f0e9f17..33f23f7fb 100644
---
a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -509,6 +509,9 @@ public class CuratorFrameworkFactory {
* By default, an executor is allocated internally using the provided
(or default)
* {@link #threadFactory(java.util.concurrent.ThreadFactory)}. Use
this method
* to set a custom executor.
+ * Whenever a custom executor is set, it is the caller's
responsibility to close the
+ * executor after the CuratorFramework closure.
+ * The internally created executor is closed when CuratorFramework is
closed.
*
* @param runSafeService executor to use for calls to notifyAll from
Watcher callbacks etc
* @return this
diff --git
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index ae4ba3a81..a9bcb53f7 100644
---
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
@@ -100,6 +101,7 @@ public final class CuratorFrameworkImpl extends
CuratorFrameworkBase {
private final EnsembleTracker ensembleTracker;
private final SchemaSet schemaSet;
private final Executor runSafeService;
+ private boolean isExternalRunSafeService = false;
private final ZookeeperCompatibility zookeeperCompatibility;
private volatile ExecutorService executorService;
@@ -194,6 +196,7 @@ public final class CuratorFrameworkImpl extends
CuratorFrameworkBase {
private Executor makeRunSafeService(CuratorFrameworkFactory.Builder
builder) {
if (builder.getRunSafeService() != null) {
+ isExternalRunSafeService = true;
return builder.getRunSafeService();
}
ThreadFactory threadFactory = builder.getThreadFactory();
@@ -373,16 +376,19 @@ public final class CuratorFrameworkImpl extends
CuratorFrameworkBase {
}
});
+ Optional<CompletableFuture<Void>> executorServiceClosure =
Optional.empty();
if (executorService != null) {
- executorService.shutdownNow();
- try {
- executorService.awaitTermination(maxCloseWaitMs,
TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- // Interrupted while interrupting; I give up.
- Thread.currentThread().interrupt();
- }
+ executorServiceClosure =
Optional.of(shutdownAndAwaitTerminationAsync(executorService));
}
+ Optional<CompletableFuture<Void>> runSafeServiceClosure =
Optional.empty();
+ if (!isExternalRunSafeService && runSafeService != null) {
+ runSafeServiceClosure =
+
Optional.of(shutdownAndAwaitTerminationAsync(((ExecutorService)
runSafeService)));
+ }
+ executorServiceClosure.ifPresent(CompletableFuture::join);
+ runSafeServiceClosure.ifPresent(CompletableFuture::join);
+
if (ensembleTracker != null) {
ensembleTracker.close();
}
@@ -400,6 +406,26 @@ public final class CuratorFrameworkImpl extends
CuratorFrameworkBase {
}
}
+ /**
+ * Utility method to run the executor service shutdown in a background
thread.
+ * This is in order to ensure we don't extend the wait time above
maxCloseWaitMs by waiting on multiple
+ * executors to terminate.
+ *
+ * @param service the ExecutorService to shut down.
+ * @return the future represents the job closing the executor service and
waits on its termination.
+ */
+ private CompletableFuture<Void> shutdownAndAwaitTerminationAsync(final
ExecutorService service) {
+ return CompletableFuture.runAsync(() -> {
+ service.shutdownNow();
+ try {
+ service.awaitTermination(maxCloseWaitMs,
TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ // Interrupted while interrupting; I give up.
+ Thread.currentThread().interrupt();
+ }
+ });
+ }
+
NamespaceImpl getNamespaceImpl() {
return namespace;
}
diff --git
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
index 0d0da50e3..8035ecd97 100644
---
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
+++
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
@@ -25,13 +25,17 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
@@ -45,6 +49,7 @@ import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.ThreadUtils;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.data.ACL;
import org.junit.jupiter.api.Test;
@@ -306,4 +311,49 @@ public class TestFrameworkBackground extends
BaseClassForTests {
CloseableUtils.closeQuietly(client);
}
}
+
+ @Test
+ public void testCloseShutsDownInternalRunSafeService() {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.newClient(
+ server.getConnectString(), timing.session(),
timing.connection(), new RetryOneTime(1));
+ client.start();
+ client.runSafe(() -> {});
+ assertTrue(enumerateThreads().anyMatch(t ->
t.getName().contains("SafeNotifyService")));
+
+ client.close();
+
+ assertTrue(enumerateThreads().noneMatch(t ->
t.getName().contains("SafeNotifyService")));
+ }
+
+ @Test
+ public void testCloseLeavesExternalRunSafeServiceRunning() throws
Exception {
+ Timing timing = new Timing();
+ ExecutorService externalRunner =
+
Executors.newSingleThreadScheduledExecutor(ThreadUtils.newThreadFactory("ExternalSafeNotifyService"));
+ CuratorFramework client = CuratorFrameworkFactory.builder()
+ .connectString(server.getConnectString())
+ .sessionTimeoutMs(timing.session())
+ .connectionTimeoutMs(timing.connection())
+ .retryPolicy(new RetryOneTime(1))
+ .maxCloseWaitMs(timing.forWaiting().milliseconds())
+ .runSafeService(externalRunner)
+ .build();
+ client.start();
+ client.runSafe(() -> {});
+ assertTrue(enumerateThreads().anyMatch(t ->
t.getName().contains("ExternalSafeNotifyService")));
+
+ client.close();
+
+ assertTrue(enumerateThreads().anyMatch(t ->
t.getName().contains("ExternalSafeNotifyService")));
+
+ externalRunner.shutdownNow();
+ assertTrue(externalRunner.awaitTermination(10, TimeUnit.SECONDS));
+ }
+
+ private static Stream<Thread> enumerateThreads() {
+ Thread[] threads = new Thread[Thread.activeCount()];
+ Thread.enumerate(threads);
+ return Arrays.stream(threads);
+ }
}