This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new de1c8f38a69 CAMEL-22295: camel-core - Consumer can setup MDC with 
route id during… (#18773)
de1c8f38a69 is described below

commit de1c8f38a69ea436ced7242356b82d4383151872
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Jul 30 20:02:10 2025 +0200

    CAMEL-22295: camel-core - Consumer can setup MDC with route id during… 
(#18773)
    
    * CAMEL-22295: camel-core - MDC for Camel based threads used by sources 
that are sticky to a given route
---
 .../aws2/kinesis/KclKinesis2Consumer.java          |  2 +-
 .../component/aws2/kinesis/Kinesis2Endpoint.java   |  4 +-
 .../camel/component/debezium/DebeziumConsumer.java |  2 +-
 .../camel/component/debezium/DebeziumEndpoint.java |  4 +-
 .../pubsublite/GooglePubsubLiteConsumer.java       |  2 +-
 .../pubsublite/GooglePubsubLiteEndpoint.java       |  4 +-
 .../google/pubsub/GooglePubsubConsumer.java        |  2 +-
 .../google/pubsub/GooglePubsubEndpoint.java        |  4 +-
 .../hazelcast/queue/HazelcastQueueConsumer.java    |  2 +-
 .../hazelcast/queue/HazelcastQueueEndpoint.java    |  4 +-
 .../smn/SimpleNotificationEndpoint.java            |  6 --
 .../apache/camel/component/jms/JmsProducer.java    |  4 +-
 .../camel/component/kafka/KafkaConsumer.java       |  2 +-
 .../camel/component/kafka/KafkaEndpoint.java       |  8 +--
 .../camel/component/kafka/KafkaProducer.java       |  2 +-
 .../kubernetes/AbstractKubernetesEndpoint.java     |  4 +-
 .../config_maps/KubernetesConfigMapsConsumer.java  |  2 +-
 .../KubernetesCustomResourcesConsumer.java         |  2 +-
 .../deployments/KubernetesDeploymentsConsumer.java |  2 +-
 .../events/KubernetesEventsConsumer.java           |  2 +-
 .../kubernetes/hpa/KubernetesHPAConsumer.java      |  2 +-
 .../namespaces/KubernetesNamespacesConsumer.java   |  2 +-
 .../kubernetes/nodes/KubernetesNodesConsumer.java  |  2 +-
 .../kubernetes/pods/KubernetesPodsConsumer.java    |  2 +-
 .../KubernetesReplicationControllersConsumer.java  |  2 +-
 .../services/KubernetesServicesConsumer.java       |  2 +-
 .../OpenshiftDeploymentConfigsConsumer.java        |  2 +-
 .../apache/camel/component/nats/NatsConsumer.java  |  2 +-
 .../apache/camel/component/nats/NatsEndpoint.java  |  4 +-
 ...TelemetryInstrumentedThreadFactoryListener.java |  2 +-
 .../camel/component/pgevent/PgEventConsumer.java   |  2 +-
 .../camel/component/pgevent/PgEventEndpoint.java   |  4 +-
 .../camel/component/rocketmq/RocketMQProducer.java |  2 +-
 .../apache/camel/component/sjms/SjmsProducer.java  |  4 +-
 .../apache/camel/spi/ExecutorServiceManager.java   |  3 +-
 .../impl/engine/BaseExecutorServiceManager.java    | 19 +++++--
 .../impl/engine/MDCThreadFactoryListener.java      | 63 +++++++++++++++++++++
 .../impl/DefaultExecutorServiceManagerTest.java    |  4 +-
 .../camel/processor/MDCRouteIdAwareTest.java       | 66 ++++++++++++++++++++++
 .../ROOT/pages/camel-4x-upgrade-guide-4_14.adoc    |  3 +
 40 files changed, 195 insertions(+), 61 deletions(-)

diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KclKinesis2Consumer.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KclKinesis2Consumer.java
index 3921456c2ff..8e1a25bda35 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KclKinesis2Consumer.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KclKinesis2Consumer.java
@@ -138,7 +138,7 @@ public class KclKinesis2Consumer extends DefaultConsumer {
         } else {
             cloudWatchAsyncClient = 
getEndpoint().getConfiguration().getCloudWatchAsyncClient();
         }
-        this.executor = this.getEndpoint().createExecutor();
+        this.executor = this.getEndpoint().createExecutor(this);
         this.executor.submit(new KclKinesisConsumingTask(
                 configuration.getStreamName(), 
configuration.getApplicationName(), kinesisAsyncClient, dynamoDbAsyncClient,
                 cloudWatchAsyncClient, 
configuration.isKclDisableCloudwatchMetricsExport()));
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
index f7a9e1aa7a3..ca493afb553 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
@@ -157,8 +157,8 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint 
implements EndpointS
         return null;
     }
 
-    public ExecutorService createExecutor() {
-        return 
getCamelContext().getExecutorServiceManager().newFixedThreadPool(this,
+    public ExecutorService createExecutor(Object source) {
+        return 
getCamelContext().getExecutorServiceManager().newFixedThreadPool(source,
                 "KinesisStream[" + configuration.getStreamName() + "]", 1);
     }
 }
diff --git 
a/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConsumer.java
 
b/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConsumer.java
index 48e198ee8de..8a89c7b94f3 100644
--- 
a/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConsumer.java
+++ 
b/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConsumer.java
@@ -50,7 +50,7 @@ public class DebeziumConsumer extends DefaultConsumer {
         super.doStart();
 
         // start a single threaded pool to monitor events
-        executorService = endpoint.createExecutor();
+        executorService = endpoint.createExecutor(this);
 
         // create engine
         dbzEngine = createDbzEngine();
diff --git 
a/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java
 
b/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java
index 9ca582f2ab2..46310445755 100644
--- 
a/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java
+++ 
b/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java
@@ -59,8 +59,8 @@ public abstract class DebeziumEndpoint<C extends 
EmbeddedDebeziumConfiguration>
         return consumer;
     }
 
-    public ExecutorService createExecutor() {
-        return 
getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this,
+    public ExecutorService createExecutor(Object source) {
+        return 
getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(source,
                 "DebeziumConsumer");
     }
 
diff --git 
a/components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteConsumer.java
 
b/components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteConsumer.java
index 128def8432d..95afd73a0c7 100644
--- 
a/components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteConsumer.java
+++ 
b/components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteConsumer.java
@@ -58,7 +58,7 @@ public class GooglePubsubLiteConsumer extends DefaultConsumer 
{
     protected void doStart() throws Exception {
         super.doStart();
         localLog.info("Starting Google PubSub Lite consumer for {}/{}", 
endpoint.getProjectId(), endpoint.getDestinationName());
-        executor = endpoint.createExecutor();
+        executor = endpoint.createExecutor(this);
         for (int i = 0; i < endpoint.getConcurrentConsumers(); i++) {
             executor.submit(new SubscriberWrapper());
         }
diff --git 
a/components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteEndpoint.java
 
b/components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteEndpoint.java
index d6d092f4d48..45ba9421723 100644
--- 
a/components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteEndpoint.java
+++ 
b/components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteEndpoint.java
@@ -132,8 +132,8 @@ public class GooglePubsubLiteEndpoint extends 
DefaultEndpoint implements Endpoin
         return consumer;
     }
 
-    public ExecutorService createExecutor() {
-        return 
getCamelContext().getExecutorServiceManager().newFixedThreadPool(this,
+    public ExecutorService createExecutor(Object source) {
+        return 
getCamelContext().getExecutorServiceManager().newFixedThreadPool(source,
                 "GooglePubsubLiteConsumer[" + getDestinationName() + "]", 
concurrentConsumers);
     }
 
diff --git 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
index 235bd536cc5..e83bebe362e 100644
--- 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
+++ 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
@@ -78,7 +78,7 @@ public class GooglePubsubConsumer extends DefaultConsumer {
         super.doStart();
 
         localLog.info("Starting Google PubSub consumer for {}/{}", 
endpoint.getProjectId(), endpoint.getDestinationName());
-        executor = endpoint.createExecutor();
+        executor = endpoint.createExecutor(this);
         for (int i = 0; i < endpoint.getConcurrentConsumers(); i++) {
             executor.submit(new SubscriberWrapper());
         }
diff --git 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
index cbae380335f..62a5d3ae0b3 100644
--- 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
+++ 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
@@ -142,8 +142,8 @@ public class GooglePubsubEndpoint extends DefaultEndpoint 
implements EndpointSer
         return consumer;
     }
 
-    public ExecutorService createExecutor() {
-        return 
getCamelContext().getExecutorServiceManager().newFixedThreadPool(this,
+    public ExecutorService createExecutor(Object source) {
+        return 
getCamelContext().getExecutorServiceManager().newFixedThreadPool(source,
                 "GooglePubsubConsumer[" + getDestinationName() + "]", 
concurrentConsumers);
     }
 
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java
index 856767a6712..5d98899c6fa 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java
@@ -44,7 +44,7 @@ public class HazelcastQueueConsumer extends 
HazelcastDefaultConsumer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        executor = ((HazelcastQueueEndpoint) getEndpoint()).createExecutor();
+        executor = ((HazelcastQueueEndpoint) 
getEndpoint()).createExecutor(this);
 
         CamelItemListener camelItemListener = new CamelItemListener(this, 
cacheName);
         queueConsumerTask = new QueueConsumerTask(camelItemListener);
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueEndpoint.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueEndpoint.java
index ee7e27354ad..51794cd0c81 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueEndpoint.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueEndpoint.java
@@ -69,8 +69,8 @@ public class HazelcastQueueEndpoint extends 
HazelcastDefaultEndpoint {
         return new HazelcastQueueProducer(hazelcastInstance, this, cacheName);
     }
 
-    public ExecutorService createExecutor() {
-        return 
getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, 
"QueueConsumer",
+    public ExecutorService createExecutor(Object source) {
+        return 
getCamelContext().getExecutorServiceManager().newFixedThreadPool(source, 
"QueueConsumer",
                 configuration.getPoolSize());
     }
 
diff --git 
a/components/camel-huawei/camel-huaweicloud-smn/src/main/java/org/apache/camel/component/huaweicloud/smn/SimpleNotificationEndpoint.java
 
b/components/camel-huawei/camel-huaweicloud-smn/src/main/java/org/apache/camel/component/huaweicloud/smn/SimpleNotificationEndpoint.java
index 8140a65ae2d..e88919cd34f 100644
--- 
a/components/camel-huawei/camel-huaweicloud-smn/src/main/java/org/apache/camel/component/huaweicloud/smn/SimpleNotificationEndpoint.java
+++ 
b/components/camel-huawei/camel-huaweicloud-smn/src/main/java/org/apache/camel/component/huaweicloud/smn/SimpleNotificationEndpoint.java
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.component.huaweicloud.smn;
 
-import java.util.concurrent.ExecutorService;
-
 import com.huaweicloud.sdk.smn.v2.SmnClient;
 import org.apache.camel.Category;
 import org.apache.camel.Consumer;
@@ -246,8 +244,4 @@ public class SimpleNotificationEndpoint extends 
DefaultEndpoint {
         this.smnClient = smnClient;
     }
 
-    public ExecutorService createExecutor() {
-        // TODO: Delete me when you implemented your custom component
-        return 
getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, 
"SimpleNotificationConsumer");
-    }
 }
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
index 3eb0ce93c62..0ce743c0138 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
@@ -577,7 +577,7 @@ public class JmsProducer extends DefaultAsyncProducer {
 
         String name = "JmsReplyManagerTimeoutChecker[" + 
getEndpoint().getEndpointConfiguredDestinationName() + "]";
         ScheduledExecutorService replyManagerScheduledExecutorService
-                = 
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name,
 name);
+                = 
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
 name);
         
replyManager.setScheduledExecutorService(replyManagerScheduledExecutorService);
 
         name = "JmsReplyManagerOnTimeout[" + 
getEndpoint().getEndpointConfiguredDestinationName() + "]";
@@ -610,7 +610,7 @@ public class JmsProducer extends DefaultAsyncProducer {
 
         String name = "JmsReplyManagerTimeoutChecker[" + replyTo + "]";
         ScheduledExecutorService replyManagerScheduledExecutorService
-                = 
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name,
 name);
+                = 
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
 name);
         
replyManager.setScheduledExecutorService(replyManagerScheduledExecutorService);
 
         name = "JmsReplyManagerOnTimeout[" + replyTo + "]";
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 1d496d34a29..52cff9dc06b 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -148,7 +148,7 @@ public class KafkaConsumer extends DefaultConsumer
             }
         }
 
-        executor = endpoint.createExecutor();
+        executor = endpoint.createExecutor(this);
 
         String topic = endpoint.getConfiguration().getTopic();
         Pattern pattern = null;
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 61f341ad7b3..41549edbba4 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -201,15 +201,15 @@ public class KafkaEndpoint extends DefaultEndpoint 
implements MultipleConsumersS
         }
     }
 
-    public ExecutorService createExecutor() {
-        return 
getCamelContext().getExecutorServiceManager().newFixedThreadPool(this,
+    public ExecutorService createExecutor(Object source) {
+        return 
getCamelContext().getExecutorServiceManager().newFixedThreadPool(source,
                 "KafkaConsumer[" + configuration.getTopic() + "]", 
configuration.getConsumersCount());
     }
 
-    public ExecutorService createProducerExecutor() {
+    public ExecutorService createProducerExecutor(Object source) {
         int core = getConfiguration().getWorkerPoolCoreSize();
         int max = getConfiguration().getWorkerPoolMaxSize();
-        return 
getCamelContext().getExecutorServiceManager().newThreadPool(this,
+        return 
getCamelContext().getExecutorServiceManager().newThreadPool(source,
                 "KafkaProducer[" + configuration.getTopic() + "]", core, max);
     }
 
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index c8af6d224a9..99b6c856865 100755
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -179,7 +179,7 @@ public class KafkaProducer extends DefaultAsyncProducer 
implements RouteIdAware
                 workerPool = configuration.getWorkerPool();
                 shutdownWorkerPool = false;
             } else {
-                workerPool = endpoint.createProducerExecutor();
+                workerPool = endpoint.createProducerExecutor(this);
                 // we create a thread pool so we should also shut it down
                 shutdownWorkerPool = true;
             }
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/AbstractKubernetesEndpoint.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/AbstractKubernetesEndpoint.java
index 406f35ce5b2..8082a526563 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/AbstractKubernetesEndpoint.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/AbstractKubernetesEndpoint.java
@@ -75,8 +75,8 @@ public abstract class AbstractKubernetesEndpoint extends 
DefaultEndpoint impleme
         }
     }
 
-    public ExecutorService createExecutor() {
-        return 
getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, 
"KubernetesConsumer",
+    public ExecutorService createExecutor(Object source) {
+        return 
getCamelContext().getExecutorServiceManager().newFixedThreadPool(source, 
"KubernetesConsumer",
                 configuration.getPoolSize());
     }
 
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsConsumer.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsConsumer.java
index b2288699fd3..16172967e9e 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsConsumer.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsConsumer.java
@@ -57,7 +57,7 @@ public class KubernetesConfigMapsConsumer extends 
DefaultConsumer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        executor = getEndpoint().createExecutor();
+        executor = getEndpoint().createExecutor(this);
 
         configMapWatcher = new ConfigMapsConsumerTask();
         executor.submit(configMapWatcher);
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/customresources/KubernetesCustomResourcesConsumer.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/customresources/KubernetesCustomResourcesConsumer.java
index 55699a7ec0d..b8cd2e3306b 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/customresources/KubernetesCustomResourcesConsumer.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/customresources/KubernetesCustomResourcesConsumer.java
@@ -56,7 +56,7 @@ public class KubernetesCustomResourcesConsumer extends 
DefaultConsumer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        executor = getEndpoint().createExecutor();
+        executor = getEndpoint().createExecutor(this);
 
         customResourcesWatcher = new CustomResourcesConsumerTask();
         executor.submit(customResourcesWatcher);
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/deployments/KubernetesDeploymentsConsumer.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/deployments/KubernetesDeploymentsConsumer.java
index 3a2a7858229..10941a4beea 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/deployments/KubernetesDeploymentsConsumer.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/deployments/KubernetesDeploymentsConsumer.java
@@ -57,7 +57,7 @@ public class KubernetesDeploymentsConsumer extends 
DefaultConsumer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        executor = getEndpoint().createExecutor();
+        executor = getEndpoint().createExecutor(this);
 
         deploymentsWatcher = new DeploymentsConsumerTask();
         executor.submit(deploymentsWatcher);
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/events/KubernetesEventsConsumer.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/events/KubernetesEventsConsumer.java
index ed2ddb8cf3b..b9a3f329318 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/events/KubernetesEventsConsumer.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/events/KubernetesEventsConsumer.java
@@ -57,7 +57,7 @@ public class KubernetesEventsConsumer extends DefaultConsumer 
{
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        executor = getEndpoint().createExecutor();
+        executor = getEndpoint().createExecutor(this);
 
         eventWatcher = new EventsConsumerTask();
         executor.submit(eventWatcher);
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/hpa/KubernetesHPAConsumer.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/hpa/KubernetesHPAConsumer.java
index c351853b615..927474cc68d 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/hpa/KubernetesHPAConsumer.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/hpa/KubernetesHPAConsumer.java
@@ -57,7 +57,7 @@ public class KubernetesHPAConsumer extends DefaultConsumer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        executor = getEndpoint().createExecutor();
+        executor = getEndpoint().createExecutor(this);
 
         hpasWatcher = new HpaConsumerTask();
         executor.submit(hpasWatcher);
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/namespaces/KubernetesNamespacesConsumer.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/namespaces/KubernetesNamespacesConsumer.java
index 67b7844f5fa..bc920a21b8d 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/namespaces/KubernetesNamespacesConsumer.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/namespaces/KubernetesNamespacesConsumer.java
@@ -56,7 +56,7 @@ public class KubernetesNamespacesConsumer extends 
DefaultConsumer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        executor = getEndpoint().createExecutor();
+        executor = getEndpoint().createExecutor(this);
 
         nsWatcher = new NamespacesConsumerTask();
         executor.submit(nsWatcher);
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/nodes/KubernetesNodesConsumer.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/nodes/KubernetesNodesConsumer.java
index 026f8033eb9..35e07c5b70e 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/nodes/KubernetesNodesConsumer.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/nodes/KubernetesNodesConsumer.java
@@ -56,7 +56,7 @@ public class KubernetesNodesConsumer extends DefaultConsumer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        executor = getEndpoint().createExecutor();
+        executor = getEndpoint().createExecutor(this);
 
         nodesWatcher = new NodesConsumerTask();
         executor.submit(nodesWatcher);
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/pods/KubernetesPodsConsumer.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/pods/KubernetesPodsConsumer.java
index 17909786da6..addcc543d53 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/pods/KubernetesPodsConsumer.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/pods/KubernetesPodsConsumer.java
@@ -57,7 +57,7 @@ public class KubernetesPodsConsumer extends DefaultConsumer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        executor = getEndpoint().createExecutor();
+        executor = getEndpoint().createExecutor(this);
 
         podsWatcher = new PodsConsumerTask();
         executor.submit(podsWatcher);
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/replication_controllers/KubernetesReplicationControllersConsumer.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/replication_controllers/KubernetesReplicationControllersConsumer.java
index fe3c22afc8e..9785ecaf494 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/replication_controllers/KubernetesReplicationControllersConsumer.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/replication_controllers/KubernetesReplicationControllersConsumer.java
@@ -58,7 +58,7 @@ public class KubernetesReplicationControllersConsumer extends 
DefaultConsumer {
     protected void doStart() throws Exception {
         super.doStart();
 
-        executor = getEndpoint().createExecutor();
+        executor = getEndpoint().createExecutor(this);
         rcWatcher = new ReplicationControllersConsumerTask();
         executor.submit(rcWatcher);
     }
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/services/KubernetesServicesConsumer.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/services/KubernetesServicesConsumer.java
index 637e08b997f..b92f165053c 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/services/KubernetesServicesConsumer.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/services/KubernetesServicesConsumer.java
@@ -57,7 +57,7 @@ public class KubernetesServicesConsumer extends 
DefaultConsumer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        executor = getEndpoint().createExecutor();
+        executor = getEndpoint().createExecutor(this);
 
         servicesWatcher = new ServicesConsumerTask();
         executor.submit(servicesWatcher);
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/openshift/deploymentconfigs/OpenshiftDeploymentConfigsConsumer.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/openshift/deploymentconfigs/OpenshiftDeploymentConfigsConsumer.java
index 685fb7e11a0..b66cb579d0c 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/openshift/deploymentconfigs/OpenshiftDeploymentConfigsConsumer.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/openshift/deploymentconfigs/OpenshiftDeploymentConfigsConsumer.java
@@ -58,7 +58,7 @@ public class OpenshiftDeploymentConfigsConsumer extends 
DefaultConsumer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        executor = getEndpoint().createExecutor();
+        executor = getEndpoint().createExecutor(this);
 
         deploymentsWatcher = new DeploymentsConfigConsumerTask();
         executor.submit(deploymentsWatcher);
diff --git 
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
 
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
index 42b5c988c5c..13c4bf5f4d2 100644
--- 
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
+++ 
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
@@ -64,7 +64,7 @@ public class NatsConsumer extends DefaultConsumer {
     protected void doStart() throws Exception {
         super.doStart();
         LOG.debug("Starting Nats Consumer");
-        this.executor = this.getEndpoint().createExecutor();
+        this.executor = this.getEndpoint().createExecutor(this);
 
         LOG.debug("Getting Nats Connection");
         this.connection = 
this.getEndpoint().getConfiguration().getConnection() != null
diff --git 
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsEndpoint.java
 
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsEndpoint.java
index 1ffa295af1b..ed1aaa74511 100644
--- 
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsEndpoint.java
+++ 
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsEndpoint.java
@@ -83,8 +83,8 @@ public class NatsEndpoint extends DefaultEndpoint
         return "nats";
     }
 
-    public ExecutorService createExecutor() {
-        return 
getCamelContext().getExecutorServiceManager().newFixedThreadPool(this,
+    public ExecutorService createExecutor(Object source) {
+        return 
getCamelContext().getExecutorServiceManager().newFixedThreadPool(source,
                 "NatsTopic[" + configuration.getTopic() + "]", 
configuration.getPoolSize());
     }
 
diff --git 
a/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryInstrumentedThreadFactoryListener.java
 
b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryInstrumentedThreadFactoryListener.java
index ef5bb9459cd..1ccde52c75c 100644
--- 
a/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryInstrumentedThreadFactoryListener.java
+++ 
b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryInstrumentedThreadFactoryListener.java
@@ -26,7 +26,7 @@ import org.apache.camel.spi.annotations.JdkService;
 public class OpenTelemetryInstrumentedThreadFactoryListener implements 
ExecutorServiceManager.ThreadFactoryListener {
 
     @Override
-    public ThreadFactory onNewThreadFactory(ThreadFactory factory) {
+    public ThreadFactory onNewThreadFactory(Object source, ThreadFactory 
factory) {
         return runnable -> factory.newThread(Context.current().wrap(runnable));
     }
 }
diff --git 
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
 
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
index a751c2a38ca..83c7f585630 100644
--- 
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
+++ 
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
@@ -63,7 +63,7 @@ public class PgEventConsumer extends DefaultConsumer {
         if (endpoint.getWorkerPool() != null) {
             workerPool = endpoint.getWorkerPool();
         } else {
-            workerPool = endpoint.createWorkerPool();
+            workerPool = endpoint.createWorkerPool(this);
             shutdownWorkerPool = true;
         }
         // used for re-connecting to the database
diff --git 
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventEndpoint.java
 
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventEndpoint.java
index 8e56d15a6b8..4cf0a8dec8d 100644
--- 
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventEndpoint.java
+++ 
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventEndpoint.java
@@ -195,8 +195,8 @@ public class PgEventEndpoint extends DefaultEndpoint 
implements EndpointServiceL
         return consumer;
     }
 
-    ExecutorService createWorkerPool() {
-        return 
getCamelContext().getExecutorServiceManager().newThreadPool(this,
+    ExecutorService createWorkerPool(Object source) {
+        return 
getCamelContext().getExecutorServiceManager().newThreadPool(source,
                 "PgEventConsumer[" + channel + "]", workerPoolCoreSize, 
workerPoolMaxSize);
     }
 
diff --git 
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java
 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java
index 0fd0f529c69..e6a181b2cc7 100644
--- 
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java
+++ 
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java
@@ -189,7 +189,7 @@ public class RocketMQProducer extends DefaultAsyncProducer {
         replyManager.setEndpoint(getEndpoint());
         String name = "RocketMQReplyManagerTimeoutChecker[" + 
getEndpoint().getTopicName() + "]";
         ScheduledExecutorService scheduledExecutorService
-                = 
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name,
 name);
+                = 
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
 name);
         replyManager.setScheduledExecutorService(scheduledExecutorService);
         LOG.debug("Starting ReplyManager: {}", name);
         ServiceHelper.startService(replyManager);
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
index 6ff8154ea8a..6e38956785b 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
@@ -166,7 +166,7 @@ public class SjmsProducer extends DefaultAsyncProducer {
 
         String name = "JmsReplyManagerTimeoutChecker[" + replyTo + "]";
         ScheduledExecutorService replyManagerScheduledExecutorService
-                = 
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name,
 name);
+                = 
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
 name);
         
replyManager.setScheduledExecutorService(replyManagerScheduledExecutorService);
 
         name = "JmsReplyManagerOnTimeout[" + replyTo + "]";
@@ -186,7 +186,7 @@ public class SjmsProducer extends DefaultAsyncProducer {
 
         String name = "JmsReplyManagerTimeoutChecker[" + 
getEndpoint().getEndpointConfiguredDestinationName() + "]";
         ScheduledExecutorService replyManagerScheduledExecutorService
-                = 
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name,
 name);
+                = 
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
 name);
         
temporaryQueueReplyManager.setScheduledExecutorService(replyManagerScheduledExecutorService);
 
         name = "JmsReplyManagerOnTimeout[" + 
getEndpoint().getEndpointConfiguredDestinationName() + "]";
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java 
b/core/camel-api/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java
index f2e2b1a1c83..34bdcc3826a 100644
--- 
a/core/camel-api/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java
+++ 
b/core/camel-api/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java
@@ -68,10 +68,11 @@ public interface ExecutorServiceManager extends 
ShutdownableService, StaticServi
          * Listener when Camel has created a new {@link ThreadFactory} to be 
used by this
          * {@link ExecutorServiceManager}.
          *
+         * @param  source  optional source where the thread is being used 
(such as a {@link org.apache.camel.Consumer}.
          * @param  factory the created factory
          * @return         the factory to use by this {@link 
ExecutorServiceManager}.
          */
-        ThreadFactory onNewThreadFactory(ThreadFactory factory);
+        ThreadFactory onNewThreadFactory(Object source, ThreadFactory factory);
     }
 
     /**
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java
index ee953195cd3..92a19b4e36a 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java
@@ -167,7 +167,7 @@ public class BaseExecutorServiceManager extends 
ServiceSupport implements Execut
 
     @Override
     public Thread newThread(String name, Runnable runnable) {
-        ThreadFactory factory = createThreadFactory(name, true);
+        ThreadFactory factory = createThreadFactory(null, name, true);
         return factory.newThread(runnable);
     }
 
@@ -200,7 +200,7 @@ public class BaseExecutorServiceManager extends 
ServiceSupport implements Execut
         ThreadPoolProfile defaultProfile = getDefaultThreadPoolProfile();
         profile.addDefaults(defaultProfile);
 
-        ThreadFactory threadFactory = createThreadFactory(sanitizedName, true);
+        ThreadFactory threadFactory = createThreadFactory(source, 
sanitizedName, true);
         ExecutorService executorService = 
threadPoolFactory.newThreadPool(profile, threadFactory);
         onThreadPoolCreated(executorService, source, profile.getId());
         if (LOG.isDebugEnabled()) {
@@ -226,7 +226,7 @@ public class BaseExecutorServiceManager extends 
ServiceSupport implements Execut
     @Override
     public ExecutorService newCachedThreadPool(Object source, String name) {
         String sanitizedName = URISupport.sanitizeUri(name);
-        ExecutorService answer = 
threadPoolFactory.newCachedThreadPool(createThreadFactory(sanitizedName, true));
+        ExecutorService answer = 
threadPoolFactory.newCachedThreadPool(createThreadFactory(source, 
sanitizedName, true));
         onThreadPoolCreated(answer, source, null);
 
         if (LOG.isDebugEnabled()) {
@@ -260,7 +260,7 @@ public class BaseExecutorServiceManager extends 
ServiceSupport implements Execut
         String sanitizedName = URISupport.sanitizeUri(name);
         profile.addDefaults(getDefaultThreadPoolProfile());
         ScheduledExecutorService answer
-                = threadPoolFactory.newScheduledThreadPool(profile, 
createThreadFactory(sanitizedName, true));
+                = threadPoolFactory.newScheduledThreadPool(profile, 
createThreadFactory(source, sanitizedName, true));
         onThreadPoolCreated(answer, source, null);
 
         if (LOG.isDebugEnabled()) {
@@ -474,6 +474,13 @@ public class BaseExecutorServiceManager extends 
ServiceSupport implements Execut
         if (!threadFactoryListeners.isEmpty()) {
             threadFactoryListeners.sort(OrderedComparator.get());
         }
+
+        // enrich threads for MDC logging
+        boolean usedMDCLogging = getCamelContext().isUseMDCLogging() != null 
&& getCamelContext().isUseMDCLogging();
+        if (usedMDCLogging) {
+            threadFactoryListeners.add(new MDCThreadFactoryListener());
+        }
+
         ServiceHelper.startService(threadPoolFactory);
     }
 
@@ -591,10 +598,10 @@ public class BaseExecutorServiceManager extends 
ServiceSupport implements Execut
         onNewExecutorService(executorService);
     }
 
-    protected ThreadFactory createThreadFactory(String name, boolean daemon) {
+    protected ThreadFactory createThreadFactory(Object source, String name, 
boolean daemon) {
         ThreadFactory factory = new CamelThreadFactory(threadNamePattern, 
name, daemon);
         for (ThreadFactoryListener listener : threadFactoryListeners) {
-            factory = listener.onNewThreadFactory(factory);
+            factory = listener.onNewThreadFactory(source, factory);
         }
         return factory;
     }
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCThreadFactoryListener.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCThreadFactoryListener.java
new file mode 100644
index 00000000000..b4a44d54fa4
--- /dev/null
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCThreadFactoryListener.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.engine;
+
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.spi.ExecutorServiceManager;
+import org.apache.camel.spi.RouteIdAware;
+import org.apache.camel.spi.UnitOfWork;
+import org.slf4j.MDC;
+
+/**
+ * MDC {@link 
org.apache.camel.spi.ExecutorServiceManager.ThreadFactoryListener} which will 
include the MDC information
+ * for route id which allows MDC logging to pin-point to the route that logs. 
This makes it possible to include this
+ * information earlier such as from the internal work that a consumer performs 
before routing
+ * {@link org.apache.camel.Exchange} where the {@link MDCUnitOfWork} would 
include this information.
+ */
+public class MDCThreadFactoryListener implements 
ExecutorServiceManager.ThreadFactoryListener {
+
+    @Override
+    public ThreadFactory onNewThreadFactory(Object source, ThreadFactory 
factory) {
+        if (source instanceof Consumer c && c instanceof RouteIdAware ra) {
+            String name = c.getEndpoint().getCamelContext().getName();
+            String routeId = ra.getRouteId();
+            if (routeId != null) {
+                return newThreadFactory(name, routeId, factory);
+            }
+        }
+        return factory;
+    }
+
+    private ThreadFactory newThreadFactory(String contextName, String routeId, 
ThreadFactory tf) {
+        return task -> {
+            Runnable wrapped = () -> {
+                MDC.put(UnitOfWork.MDC_CAMEL_CONTEXT_ID, contextName);
+                MDC.put(UnitOfWork.MDC_ROUTE_ID, routeId);
+                try {
+                    task.run();
+                } finally {
+                    MDC.remove(UnitOfWork.MDC_CAMEL_CONTEXT_ID);
+                    MDC.remove(UnitOfWork.MDC_ROUTE_ID);
+                }
+            };
+            return tf.newThread(wrapped);
+        };
+
+    }
+}
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
index 0cf702024d1..91ca8e0ee9e 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
@@ -572,7 +572,7 @@ public class DefaultExecutorServiceManagerTest extends 
ContextTestSupport {
         // custom thread factory
         ThreadFactory myFactory = r -> new Thread(r, "MyFactory");
         // hook custom factory into Camel
-        context.getExecutorServiceManager().addThreadFactoryListener(factory 
-> myFactory);
+        context.getExecutorServiceManager().addThreadFactoryListener(((source, 
factory) -> myFactory));
         // create thread
         Thread thread = context.getExecutorServiceManager().newThread("Cool", 
() -> {
             // noop
@@ -592,7 +592,7 @@ public class DefaultExecutorServiceManagerTest extends 
ContextTestSupport {
         // custom thread factory
         ThreadFactory myFactory = r -> new Thread(r, "MyFactory2");
         // hook custom factory into Camel via registry
-        ExecutorServiceManager.ThreadFactoryListener listener = factory -> 
myFactory;
+        ExecutorServiceManager.ThreadFactoryListener listener = (source, 
factory) -> myFactory;
         c.getRegistry().bind("myListener", listener);
         c.start();
 
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/MDCRouteIdAwareTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/MDCRouteIdAwareTest.java
new file mode 100644
index 00000000000..d2359cca0b0
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/MDCRouteIdAwareTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.file.GenericFile;
+import org.apache.camel.component.file.GenericFileFilter;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spi.UnitOfWork;
+import org.junit.jupiter.api.Test;
+import org.slf4j.MDC;
+
+public class MDCRouteIdAwareTest extends ContextTestSupport {
+
+    @Test
+    public void testMDC() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+
+        template.sendBodyAndHeader(fileUri(), "Hello World", 
Exchange.FILE_NAME, "hello.txt");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                // enable MDC
+                context.setUseMDCLogging(true);
+                context.getRegistry().bind("myFilter", new MyFilter());
+
+                from(fileUri("?filter=#myFilter")).routeId("myRoute")
+                        .to("mock:result");
+
+            }
+        };
+    }
+
+    private class MyFilter implements GenericFileFilter {
+
+        @Override
+        public boolean accept(GenericFile file) {
+            String rid = MDC.get(UnitOfWork.MDC_ROUTE_ID);
+            String name = MDC.get(UnitOfWork.MDC_CAMEL_CONTEXT_ID);
+            return "myRoute".equals(rid) && context.getName().equals(name);
+        }
+    }
+}
diff --git 
a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_14.adoc 
b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_14.adoc
index 714cc383f04..da0fb875900 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_14.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_14.adoc
@@ -8,6 +8,9 @@ from both 4.0 to 4.1 and 4.1 to 4.2.
 
 === camel-core
 
+The `org.apache.camel.spi.ExecutorServiceManager.ThreadFactoryListener` has 
changed the method signature to include the source,
+so the method is changed from `ThreadFactory onNewThreadFactory(ThreadFactory 
factory)` to `ThreadFactory onNewThreadFactory(Object source, ThreadFactory 
factory)`
+
 ==== Splitter and Multicast EIPs
 
 When using `shareUnitOfWork=true` in Split or Multicast EIPs, then Camel will 
now use a single shared `UnitOfWork` instance (parent)

Reply via email to