This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-4.8.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.8.x by this push:
new b6057920297 CAMEL-21255: camel-core - Add listener for creating
ThreadFactory in … (#15679)
b6057920297 is described below
commit b60579202970c2e81ec3667303f785b2817f8277
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Sep 25 13:34:44 2024 +0200
CAMEL-21255: camel-core - Add listener for creating ThreadFactory in …
(#15679)
* CAMEL-21255: camel-core - Add listener for creating ThreadFactory in
ExecutorServiceManager
---
.../apache/camel/spi/ExecutorServiceManager.java | 29 ++++++++++++++
.../impl/engine/BaseExecutorServiceManager.java | 31 ++++++++++++++-
.../impl/DefaultExecutorServiceManagerTest.java | 45 ++++++++++++++++++++++
3 files changed, 103 insertions(+), 2 deletions(-)
diff --git
a/core/camel-api/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java
b/core/camel-api/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java
index 5a4726914f7..f2e2b1a1c83 100644
---
a/core/camel-api/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java
+++
b/core/camel-api/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java
@@ -19,6 +19,7 @@ package org.apache.camel.spi;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
import org.apache.camel.ShutdownableService;
import org.apache.camel.StaticService;
@@ -52,6 +53,34 @@ import org.apache.camel.StaticService;
*/
public interface ExecutorServiceManager extends ShutdownableService,
StaticService {
+ /**
+ * Listener when a new {@link ThreadFactory} is created, which allows to
plugin custom behaviour.
+ */
+ @FunctionalInterface
+ interface ThreadFactoryListener {
+
+ /**
+ * Service factory key.
+ */
+ String FACTORY = "thread-factory-listener";
+
+ /**
+ * Listener when Camel has created a new {@link ThreadFactory} to be
used by this
+ * {@link ExecutorServiceManager}.
+ *
+ * @param factory the created factory
+ * @return the factory to use by this {@link
ExecutorServiceManager}.
+ */
+ ThreadFactory onNewThreadFactory(ThreadFactory factory);
+ }
+
+ /**
+ * Adds a custom {@link ThreadFactoryListener} to use
+ *
+ * @param threadFactoryListener the thread factory listener
+ */
+ void addThreadFactoryListener(ThreadFactoryListener threadFactoryListener);
+
/**
* Gets the {@link ThreadPoolFactory} to use for creating the thread pools.
*
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java
index 7aae8d9e463..ee953195cd3 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java
@@ -39,6 +39,7 @@ import org.apache.camel.spi.ThreadPoolFactory;
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.support.CamelContextHelper;
import org.apache.camel.support.DefaultThreadPoolFactory;
+import org.apache.camel.support.OrderedComparator;
import org.apache.camel.support.ResolverHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
@@ -63,6 +64,7 @@ public class BaseExecutorServiceManager extends
ServiceSupport implements Execut
private final CamelContext camelContext;
private final List<ExecutorService> executorServices = new
CopyOnWriteArrayList<>();
private final Map<String, ThreadPoolProfile> threadPoolProfiles = new
ConcurrentHashMap<>();
+ private final List<ThreadFactoryListener> threadFactoryListeners = new
CopyOnWriteArrayList<>();
private ThreadPoolFactory threadPoolFactory;
private String threadNamePattern;
private long shutdownAwaitTermination = 10000;
@@ -89,6 +91,11 @@ public class BaseExecutorServiceManager extends
ServiceSupport implements Execut
return camelContext;
}
+ @Override
+ public void addThreadFactoryListener(ThreadFactoryListener
threadFactoryListener) {
+ threadFactoryListeners.add(threadFactoryListener);
+ }
+
@Override
public ThreadPoolFactory getThreadPoolFactory() {
return threadPoolFactory;
@@ -447,11 +454,26 @@ public class BaseExecutorServiceManager extends
ServiceSupport implements Execut
}
CamelContextAware.trySetCamelContext(threadPoolFactory, camelContext);
ServiceHelper.initService(threadPoolFactory);
+
+ // discover custom thread factory listener via Camel factory finder
+ ResolverHelper.resolveService(
+ camelContext,
+
camelContext.getCamelContextExtension().getBootstrapFactoryFinder(),
+ ThreadFactoryListener.FACTORY,
+
ThreadFactoryListener.class).ifPresent(this::addThreadFactoryListener);
}
@Override
protected void doStart() throws Exception {
super.doStart();
+
+ Set<ThreadFactoryListener> listeners =
camelContext.getRegistry().findByType(ThreadFactoryListener.class);
+ if (listeners != null && !listeners.isEmpty()) {
+ threadFactoryListeners.addAll(listeners);
+ }
+ if (!threadFactoryListeners.isEmpty()) {
+ threadFactoryListeners.sort(OrderedComparator.get());
+ }
ServiceHelper.startService(threadPoolFactory);
}
@@ -504,6 +526,7 @@ public class BaseExecutorServiceManager extends
ServiceSupport implements Execut
}
ServiceHelper.stopAndShutdownServices(threadPoolFactory);
+ threadFactoryListeners.clear();
}
/**
@@ -568,8 +591,12 @@ public class BaseExecutorServiceManager extends
ServiceSupport implements Execut
onNewExecutorService(executorService);
}
- protected ThreadFactory createThreadFactory(String name, boolean isDaemon)
{
- return new CamelThreadFactory(threadNamePattern, name, isDaemon);
+ protected ThreadFactory createThreadFactory(String name, boolean daemon) {
+ ThreadFactory factory = new CamelThreadFactory(threadNamePattern,
name, daemon);
+ for (ThreadFactoryListener listener : threadFactoryListeners) {
+ factory = listener.onNewThreadFactory(factory);
+ }
+ return factory;
}
}
diff --git
a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
index d6896cc87e2..0cf702024d1 100644
---
a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
@@ -18,10 +18,13 @@ package org.apache.camel.impl;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.spi.ExecutorServiceManager;
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
@@ -564,4 +567,46 @@ public class DefaultExecutorServiceManagerTest extends
ContextTestSupport {
assertTrue(pool.isTerminated());
}
+ @Test
+ public void testThreadFactoryListener() {
+ // custom thread factory
+ ThreadFactory myFactory = r -> new Thread(r, "MyFactory");
+ // hook custom factory into Camel
+ context.getExecutorServiceManager().addThreadFactoryListener(factory
-> myFactory);
+ // create thread
+ Thread thread = context.getExecutorServiceManager().newThread("Cool",
() -> {
+ // noop
+ });
+
+ assertNotNull(thread);
+ assertTrue(thread.isDaemon());
+ // should be created by custom factory instead of Camel
+ assertTrue(thread.getName().contains("MyFactory"));
+ }
+
+ @Test
+ public void testThreadFactoryListenerViaRegistry() {
+ // create another CamelContext as camelContext is already started in
this test-class
+ CamelContext c = new DefaultCamelContext();
+
+ // custom thread factory
+ ThreadFactory myFactory = r -> new Thread(r, "MyFactory2");
+ // hook custom factory into Camel via registry
+ ExecutorServiceManager.ThreadFactoryListener listener = factory ->
myFactory;
+ c.getRegistry().bind("myListener", listener);
+ c.start();
+
+ // create thread
+ Thread thread = c.getExecutorServiceManager().newThread("Cool2", () ->
{
+ // noop
+ });
+
+ assertNotNull(thread);
+ assertTrue(thread.isDaemon());
+ // should be created by custom factory instead of Camel
+ assertTrue(thread.getName().contains("MyFactory2"));
+
+ c.stop();
+ }
+
}