This is an automated email from the ASF dual-hosted git repository.

kezhuw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git


The following commit(s) were added to refs/heads/master by this push:
     new f18f0acf4 GH-1277: SafeNotifyService threads leak in 
CuratorFrameWorkImpl (#1278)
f18f0acf4 is described below

commit f18f0acf432231ece5dcd179020c4951dbe49803
Author: Istvan Fajth <[email protected]>
AuthorDate: Thu Dec 18 05:13:10 2025 +0100

    GH-1277: SafeNotifyService threads leak in CuratorFrameWorkImpl (#1278)
    
    CURATOR-495 introduced a new runSafeService field in CuratorFrameworkImpl 
class, and this field is
    either initialized by an external ExecutorService via the builder, or it is 
created internally
    within the class.
    
    In the CuratorFrameworkImpl#close method though, this Executor is never 
closed, so the threads
    that are opened by the instances are lingering there until the VM is closed 
by default.
    Worse, if someone specifies a thread factory to the framework 
implementation via the builder that
    produces non-daemon threads, the VM never exits due to the unstopped single 
thread executor.
    
    Fixes #1277.
---
 .../curator/framework/CuratorFrameworkFactory.java |  3 ++
 .../framework/imps/CuratorFrameworkImpl.java       | 40 ++++++++++++++---
 .../framework/imps/TestFrameworkBackground.java    | 50 ++++++++++++++++++++++
 3 files changed, 86 insertions(+), 7 deletions(-)

diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index 87f0e9f17..33f23f7fb 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -509,6 +509,9 @@ public class CuratorFrameworkFactory {
          * By default, an executor is allocated internally using the provided 
(or default)
          * {@link #threadFactory(java.util.concurrent.ThreadFactory)}. Use 
this method
          * to set a custom executor.
+         * Whenever a custom executor is set, it is the caller's 
responsibility to close the
+         * executor after the CuratorFramework closure.
+         * The internally created executor is closed when CuratorFramework is 
closed.
          *
          * @param runSafeService executor to use for calls to notifyAll from 
Watcher callbacks etc
          * @return this
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index ae4ba3a81..a9bcb53f7 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
@@ -100,6 +101,7 @@ public final class CuratorFrameworkImpl extends 
CuratorFrameworkBase {
     private final EnsembleTracker ensembleTracker;
     private final SchemaSet schemaSet;
     private final Executor runSafeService;
+    private boolean isExternalRunSafeService = false;
     private final ZookeeperCompatibility zookeeperCompatibility;
 
     private volatile ExecutorService executorService;
@@ -194,6 +196,7 @@ public final class CuratorFrameworkImpl extends 
CuratorFrameworkBase {
 
     private Executor makeRunSafeService(CuratorFrameworkFactory.Builder 
builder) {
         if (builder.getRunSafeService() != null) {
+            isExternalRunSafeService = true;
             return builder.getRunSafeService();
         }
         ThreadFactory threadFactory = builder.getThreadFactory();
@@ -373,16 +376,19 @@ public final class CuratorFrameworkImpl extends 
CuratorFrameworkBase {
                 }
             });
 
+            Optional<CompletableFuture<Void>> executorServiceClosure = 
Optional.empty();
             if (executorService != null) {
-                executorService.shutdownNow();
-                try {
-                    executorService.awaitTermination(maxCloseWaitMs, 
TimeUnit.MILLISECONDS);
-                } catch (InterruptedException e) {
-                    // Interrupted while interrupting; I give up.
-                    Thread.currentThread().interrupt();
-                }
+                executorServiceClosure = 
Optional.of(shutdownAndAwaitTerminationAsync(executorService));
             }
 
+            Optional<CompletableFuture<Void>> runSafeServiceClosure = 
Optional.empty();
+            if (!isExternalRunSafeService && runSafeService != null) {
+                runSafeServiceClosure =
+                        
Optional.of(shutdownAndAwaitTerminationAsync(((ExecutorService) 
runSafeService)));
+            }
+            executorServiceClosure.ifPresent(CompletableFuture::join);
+            runSafeServiceClosure.ifPresent(CompletableFuture::join);
+
             if (ensembleTracker != null) {
                 ensembleTracker.close();
             }
@@ -400,6 +406,26 @@ public final class CuratorFrameworkImpl extends 
CuratorFrameworkBase {
         }
     }
 
+    /**
+     * Utility method to run the executor service shutdown in a background 
thread.
+     * This is in order to ensure we don't extend the wait time above 
maxCloseWaitMs by waiting on multiple
+     * executors to terminate.
+     *
+     * @param service the ExecutorService to shut down.
+     * @return the future represents the job closing the executor service and 
waits on its termination.
+     */
+    private CompletableFuture<Void> shutdownAndAwaitTerminationAsync(final 
ExecutorService service) {
+        return CompletableFuture.runAsync(() -> {
+            service.shutdownNow();
+            try {
+                service.awaitTermination(maxCloseWaitMs, 
TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                // Interrupted while interrupting; I give up.
+                Thread.currentThread().interrupt();
+            }
+        });
+    }
+
     NamespaceImpl getNamespaceImpl() {
         return namespace;
     }
diff --git 
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
 
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
index 0d0da50e3..8035ecd97 100644
--- 
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
+++ 
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
@@ -25,13 +25,17 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.ACLProvider;
@@ -45,6 +49,7 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.ThreadUtils;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.data.ACL;
 import org.junit.jupiter.api.Test;
@@ -306,4 +311,49 @@ public class TestFrameworkBackground extends 
BaseClassForTests {
             CloseableUtils.closeQuietly(client);
         }
     }
+
+    @Test
+    public void testCloseShutsDownInternalRunSafeService() {
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(
+                server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+        client.start();
+        client.runSafe(() -> {});
+        assertTrue(enumerateThreads().anyMatch(t -> 
t.getName().contains("SafeNotifyService")));
+
+        client.close();
+
+        assertTrue(enumerateThreads().noneMatch(t -> 
t.getName().contains("SafeNotifyService")));
+    }
+
+    @Test
+    public void testCloseLeavesExternalRunSafeServiceRunning() throws 
Exception {
+        Timing timing = new Timing();
+        ExecutorService externalRunner =
+                
Executors.newSingleThreadScheduledExecutor(ThreadUtils.newThreadFactory("ExternalSafeNotifyService"));
+        CuratorFramework client = CuratorFrameworkFactory.builder()
+                .connectString(server.getConnectString())
+                .sessionTimeoutMs(timing.session())
+                .connectionTimeoutMs(timing.connection())
+                .retryPolicy(new RetryOneTime(1))
+                .maxCloseWaitMs(timing.forWaiting().milliseconds())
+                .runSafeService(externalRunner)
+                .build();
+        client.start();
+        client.runSafe(() -> {});
+        assertTrue(enumerateThreads().anyMatch(t -> 
t.getName().contains("ExternalSafeNotifyService")));
+
+        client.close();
+
+        assertTrue(enumerateThreads().anyMatch(t -> 
t.getName().contains("ExternalSafeNotifyService")));
+
+        externalRunner.shutdownNow();
+        assertTrue(externalRunner.awaitTermination(10, TimeUnit.SECONDS));
+    }
+
+    private static Stream<Thread> enumerateThreads() {
+        Thread[] threads = new Thread[Thread.activeCount()];
+        Thread.enumerate(threads);
+        return Arrays.stream(threads);
+    }
 }

Reply via email to