This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git


The following commit(s) were added to refs/heads/master by this push:
     new 83de5488 CURATOR-640: Run PathChildrenCache's & TreeCache's default 
thread isolated  (#419)
83de5488 is described below

commit 83de54888d39173acd7ddad0435f955e4ba736b1
Author: tison <[email protected]>
AuthorDate: Wed Jun 15 19:43:13 2022 +0800

    CURATOR-640: Run PathChildrenCache's & TreeCache's default thread isolated  
(#419)
    
    * CURATOR-640: Run PathChildrenCache default thread isolated
    
    Signed-off-by: tison <[email protected]>
---
 .../framework/recipes/cache/PathChildrenCache.java |  7 ++--
 .../curator/framework/recipes/cache/TreeCache.java |  7 ++--
 .../recipes/cache/TestPathChildrenCache.java       | 41 ++++++++++++++++++++
 .../framework/recipes/cache/TestTreeCache.java     | 44 ++++++++++++++++++++++
 4 files changed, 93 insertions(+), 6 deletions(-)

diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index 73930fd4..9121a49f 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -25,6 +25,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import java.util.function.Supplier;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.EnsureContainers;
 import org.apache.curator.framework.WatcherRemoveCuratorFramework;
@@ -138,7 +139,7 @@ public class PathChildrenCache implements Closeable
             handleStateChange(newState);
         }
     };
-    public static final ThreadFactory defaultThreadFactory = 
ThreadUtils.newThreadFactory("PathChildrenCache");
+    public static final Supplier<ThreadFactory> defaultThreadFactorySupplier = 
() -> ThreadUtils.newThreadFactory("PathChildrenCache");
 
     /**
      * @param client the client
@@ -150,7 +151,7 @@ public class PathChildrenCache implements Closeable
     @SuppressWarnings("deprecation")
     public PathChildrenCache(CuratorFramework client, String path, 
PathChildrenCacheMode mode)
     {
-        this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, 
false, new 
CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory),
 true));
+        this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, 
false, new 
CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactorySupplier.get()),
 true));
     }
 
     /**
@@ -174,7 +175,7 @@ public class PathChildrenCache implements Closeable
      */
     public PathChildrenCache(CuratorFramework client, String path, boolean 
cacheData)
     {
-        this(client, path, cacheData, false, new 
CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory),
 true));
+        this(client, path, cacheData, false, new 
CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactorySupplier.get()),
 true));
     }
 
     /**
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index 13b9c595..5e7deb76 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+import java.util.function.Supplier;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
@@ -108,7 +109,7 @@ public class TreeCache implements Closeable
             ExecutorService executor = executorService;
             if ( executor == null )
             {
-                executor = 
Executors.newSingleThreadExecutor(defaultThreadFactory);
+                executor = 
Executors.newSingleThreadExecutor(defaultThreadFactorySupplier.get());
             }
             return new TreeCache(client, path, cacheData, dataIsCompressed, 
maxDepth, executor, createParentNodes, disableZkWatches, selector);
         }
@@ -554,7 +555,7 @@ public class TreeCache implements Closeable
         }
     };
 
-    static final ThreadFactory defaultThreadFactory = 
ThreadUtils.newThreadFactory("TreeCache");
+    private static final Supplier<ThreadFactory> defaultThreadFactorySupplier 
= () -> ThreadUtils.newThreadFactory("TreeCache");
 
     /**
      * Create a TreeCache for the given client and path with default options.
@@ -571,7 +572,7 @@ public class TreeCache implements Closeable
      */
     public TreeCache(CuratorFramework client, String path)
     {
-        this(client, path, true, false, Integer.MAX_VALUE, 
Executors.newSingleThreadExecutor(defaultThreadFactory), false, false, new 
DefaultTreeCacheSelector());
+        this(client, path, true, false, Integer.MAX_VALUE, 
Executors.newSingleThreadExecutor(defaultThreadFactorySupplier.get()), false, 
false, new DefaultTreeCacheSelector());
     }
 
     /**
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
index 3790b34c..89dd2fc5 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
@@ -1151,4 +1151,45 @@ public class TestPathChildrenCache extends 
BaseClassForTests
             TestCleanState.closeAndTestClean(client);
         }
     }
+
+    @Test
+    public void testIsolatedThreadGroup() throws Exception {
+        AtomicReference<Throwable> exception = new AtomicReference<>();
+
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), 30000, 30000, new 
RetryOneTime(1));
+        client.getUnhandledErrorListenable().addListener((message, e) -> 
exception.set(e));
+        client.start();
+
+        ThreadGroup threadGroup1 = new ThreadGroup("testGroup1");
+        Thread thread1 = new Thread(threadGroup1, () -> {
+            try ( final PathChildrenCache cache = new 
PathChildrenCache(client, "/test1", true) ) {
+                cache.start();
+                Thread.sleep(1000);
+            } catch (Exception e) {
+                exception.set(e);
+            }
+        });
+
+        thread1.start();
+        thread1.join();
+        assertNull(exception.get());
+
+        // After the thread group is destroyed, all PathChildrenCache instances
+        // will fail to start due to inability to add new threads into the 
first thread group
+        threadGroup1.destroy();
+
+        ThreadGroup threadGroup2 = new ThreadGroup("testGroup2");
+        Thread thread2 = new Thread(threadGroup2, () -> {
+            try ( final PathChildrenCache cache = new 
PathChildrenCache(client, "/test1", true) ) {
+                cache.start();
+                Thread.sleep(1000);
+            } catch (Exception e) {
+                exception.set(e);
+            }
+        });
+
+        thread2.start();
+        thread2.join();
+        assertNull(exception.get());
+    }
 }
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
index 2b9e491c..93692e6c 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
@@ -25,9 +25,12 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import com.google.common.collect.ImmutableSet;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
+import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.zookeeper.CreateMode;
@@ -640,4 +643,45 @@ public class TestTreeCache extends BaseTestTreeCache
 
         assertNoMoreEvents();
     }
+
+    @Test
+    public void testIsolatedThreadGroup() throws Exception {
+        AtomicReference<Throwable> exception = new AtomicReference<>();
+
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), 30000, 30000, new 
RetryOneTime(1));
+        client.getUnhandledErrorListenable().addListener((message, e) -> 
exception.set(e));
+        client.start();
+
+        ThreadGroup threadGroup1 = new ThreadGroup("testGroup1");
+        Thread thread1 = new Thread(threadGroup1, () -> {
+            try ( final TreeCache cache = new TreeCache(client, "/test1") ) {
+                cache.start();
+                Thread.sleep(1000);
+            } catch (Exception e) {
+                exception.set(e);
+            }
+        });
+
+        thread1.start();
+        thread1.join();
+        assertNull(exception.get());
+
+        // After the thread group is destroyed, all PathChildrenCache instances
+        // will fail to start due to inability to add new threads into the 
first thread group
+        threadGroup1.destroy();
+
+        ThreadGroup threadGroup2 = new ThreadGroup("testGroup2");
+        Thread thread2 = new Thread(threadGroup2, () -> {
+            try ( final TreeCache cache = new TreeCache(client, "/test1") ) {
+                cache.start();
+                Thread.sleep(1000);
+            } catch (Exception e) {
+                exception.set(e);
+            }
+        });
+
+        thread2.start();
+        thread2.join();
+        assertNull(exception.get());
+    }
 }

Reply via email to