This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git
The following commit(s) were added to refs/heads/master by this push:
new 83de5488 CURATOR-640: Run PathChildrenCache's & TreeCache's default
thread isolated (#419)
83de5488 is described below
commit 83de54888d39173acd7ddad0435f955e4ba736b1
Author: tison <[email protected]>
AuthorDate: Wed Jun 15 19:43:13 2022 +0800
CURATOR-640: Run PathChildrenCache's & TreeCache's default thread isolated
(#419)
* CURATOR-640: Run PathChildrenCache default thread isolated
Signed-off-by: tison <[email protected]>
---
.../framework/recipes/cache/PathChildrenCache.java | 7 ++--
.../curator/framework/recipes/cache/TreeCache.java | 7 ++--
.../recipes/cache/TestPathChildrenCache.java | 41 ++++++++++++++++++++
.../framework/recipes/cache/TestTreeCache.java | 44 ++++++++++++++++++++++
4 files changed, 93 insertions(+), 6 deletions(-)
diff --git
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index 73930fd4..9121a49f 100644
---
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -25,6 +25,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import java.util.function.Supplier;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.EnsureContainers;
import org.apache.curator.framework.WatcherRemoveCuratorFramework;
@@ -138,7 +139,7 @@ public class PathChildrenCache implements Closeable
handleStateChange(newState);
}
};
- public static final ThreadFactory defaultThreadFactory =
ThreadUtils.newThreadFactory("PathChildrenCache");
+ public static final Supplier<ThreadFactory> defaultThreadFactorySupplier =
() -> ThreadUtils.newThreadFactory("PathChildrenCache");
/**
* @param client the client
@@ -150,7 +151,7 @@ public class PathChildrenCache implements Closeable
@SuppressWarnings("deprecation")
public PathChildrenCache(CuratorFramework client, String path,
PathChildrenCacheMode mode)
{
- this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY,
false, new
CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory),
true));
+ this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY,
false, new
CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactorySupplier.get()),
true));
}
/**
@@ -174,7 +175,7 @@ public class PathChildrenCache implements Closeable
*/
public PathChildrenCache(CuratorFramework client, String path, boolean
cacheData)
{
- this(client, path, cacheData, false, new
CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory),
true));
+ this(client, path, cacheData, false, new
CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactorySupplier.get()),
true));
}
/**
diff --git
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index 13b9c595..5e7deb76 100644
---
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
+import java.util.function.Supplier;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
@@ -108,7 +109,7 @@ public class TreeCache implements Closeable
ExecutorService executor = executorService;
if ( executor == null )
{
- executor =
Executors.newSingleThreadExecutor(defaultThreadFactory);
+ executor =
Executors.newSingleThreadExecutor(defaultThreadFactorySupplier.get());
}
return new TreeCache(client, path, cacheData, dataIsCompressed,
maxDepth, executor, createParentNodes, disableZkWatches, selector);
}
@@ -554,7 +555,7 @@ public class TreeCache implements Closeable
}
};
- static final ThreadFactory defaultThreadFactory =
ThreadUtils.newThreadFactory("TreeCache");
+ private static final Supplier<ThreadFactory> defaultThreadFactorySupplier
= () -> ThreadUtils.newThreadFactory("TreeCache");
/**
* Create a TreeCache for the given client and path with default options.
@@ -571,7 +572,7 @@ public class TreeCache implements Closeable
*/
public TreeCache(CuratorFramework client, String path)
{
- this(client, path, true, false, Integer.MAX_VALUE,
Executors.newSingleThreadExecutor(defaultThreadFactory), false, false, new
DefaultTreeCacheSelector());
+ this(client, path, true, false, Integer.MAX_VALUE,
Executors.newSingleThreadExecutor(defaultThreadFactorySupplier.get()), false,
false, new DefaultTreeCacheSelector());
}
/**
diff --git
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
index 3790b34c..89dd2fc5 100644
---
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
+++
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
@@ -1151,4 +1151,45 @@ public class TestPathChildrenCache extends
BaseClassForTests
TestCleanState.closeAndTestClean(client);
}
}
+
+ @Test
+ public void testIsolatedThreadGroup() throws Exception {
+ AtomicReference<Throwable> exception = new AtomicReference<>();
+
+ CuratorFramework client =
CuratorFrameworkFactory.newClient(server.getConnectString(), 30000, 30000, new
RetryOneTime(1));
+ client.getUnhandledErrorListenable().addListener((message, e) ->
exception.set(e));
+ client.start();
+
+ ThreadGroup threadGroup1 = new ThreadGroup("testGroup1");
+ Thread thread1 = new Thread(threadGroup1, () -> {
+ try ( final PathChildrenCache cache = new
PathChildrenCache(client, "/test1", true) ) {
+ cache.start();
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ exception.set(e);
+ }
+ });
+
+ thread1.start();
+ thread1.join();
+ assertNull(exception.get());
+
+ // After the thread group is destroyed, all PathChildrenCache instances
+ // will fail to start due to inability to add new threads into the
first thread group
+ threadGroup1.destroy();
+
+ ThreadGroup threadGroup2 = new ThreadGroup("testGroup2");
+ Thread thread2 = new Thread(threadGroup2, () -> {
+ try ( final PathChildrenCache cache = new
PathChildrenCache(client, "/test1", true) ) {
+ cache.start();
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ exception.set(e);
+ }
+ });
+
+ thread2.start();
+ thread2.join();
+ assertNull(exception.get());
+ }
}
diff --git
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
index 2b9e491c..93692e6c 100644
---
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
+++
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
@@ -25,9 +25,12 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.google.common.collect.ImmutableSet;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
+import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
@@ -640,4 +643,45 @@ public class TestTreeCache extends BaseTestTreeCache
assertNoMoreEvents();
}
+
+ @Test
+ public void testIsolatedThreadGroup() throws Exception {
+ AtomicReference<Throwable> exception = new AtomicReference<>();
+
+ CuratorFramework client =
CuratorFrameworkFactory.newClient(server.getConnectString(), 30000, 30000, new
RetryOneTime(1));
+ client.getUnhandledErrorListenable().addListener((message, e) ->
exception.set(e));
+ client.start();
+
+ ThreadGroup threadGroup1 = new ThreadGroup("testGroup1");
+ Thread thread1 = new Thread(threadGroup1, () -> {
+ try ( final TreeCache cache = new TreeCache(client, "/test1") ) {
+ cache.start();
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ exception.set(e);
+ }
+ });
+
+ thread1.start();
+ thread1.join();
+ assertNull(exception.get());
+
+ // After the thread group is destroyed, all PathChildrenCache instances
+ // will fail to start due to inability to add new threads into the
first thread group
+ threadGroup1.destroy();
+
+ ThreadGroup threadGroup2 = new ThreadGroup("testGroup2");
+ Thread thread2 = new Thread(threadGroup2, () -> {
+ try ( final TreeCache cache = new TreeCache(client, "/test1") ) {
+ cache.start();
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ exception.set(e);
+ }
+ });
+
+ thread2.start();
+ thread2.join();
+ assertNull(exception.get());
+ }
}