This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new b8af4cea1b3 CAMEL-21823: camel-core - Propagate MDC custom keys for 
WireTap and OnCompletion EIPs (#17338)
b8af4cea1b3 is described below

commit b8af4cea1b3eae720ae920697f55fba4a02d6d9b
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Mar 3 19:47:38 2025 +0000

    CAMEL-21823: camel-core - Propagate MDC custom keys for WireTap and 
OnCompletion EIPs (#17338)
---
 .../apache/camel/processor/MulticastProcessor.java |  35 +----
 .../camel/processor/OnCompletionProcessor.java     |  47 ++++---
 .../apache/camel/processor/ProcessorHelper.java    |  35 +++++
 .../apache/camel/processor/WireTapProcessor.java   |   3 +-
 .../processor/MDCOnCompletionParallelTest.java     | 140 ++++++++++++++++++++
 .../camel/processor/MDCWireTapCustomKeysTest.java  | 144 +++++++++++++++++++++
 .../org/apache/camel/processor/MDCWireTapTest.java |   4 +-
 .../ROOT/pages/camel-4x-upgrade-guide-4_11.adoc    |  10 +-
 8 files changed, 355 insertions(+), 63 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 7ec13021502..0b3e6e8797d 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -68,7 +68,6 @@ import org.apache.camel.support.DefaultExchange;
 import org.apache.camel.support.EventHelper;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.LRUCacheFactory;
-import org.apache.camel.support.PatternHelper;
 import org.apache.camel.support.PluginHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.CastUtils;
@@ -80,6 +79,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
+import static 
org.apache.camel.processor.ProcessorHelper.prepareMDCParallelTask;
 import static org.apache.camel.util.ObjectHelper.notNull;
 
 /**
@@ -383,7 +383,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
 
     protected void schedule(final Runnable runnable, boolean sync) {
         if (isParallelProcessing()) {
-            Runnable task = prepareParallelTask(runnable);
+            Runnable task = prepareMDCParallelTask(camelContext, runnable);
             try {
                 executorService.submit(() -> 
reactiveExecutor.scheduleSync(task));
             } catch (RejectedExecutionException e) {
@@ -398,37 +398,6 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
         }
     }
 
-    private Runnable prepareParallelTask(Runnable runnable) {
-        Runnable answer = runnable;
-
-        // if MDC is enabled we need to propagate the information
-        // to the sub task which is executed on another thread from the thread 
pool
-        if (camelContext.isUseMDCLogging()) {
-            String pattern = camelContext.getMDCLoggingKeysPattern();
-            Map<String, String> mdc = MDC.getCopyOfContextMap();
-            if (mdc != null && !mdc.isEmpty()) {
-                answer = () -> {
-                    try {
-                        if (pattern == null || "*".equals(pattern)) {
-                            mdc.forEach(MDC::put);
-                        } else {
-                            final String[] patterns = pattern.split(",");
-                            mdc.forEach((k, v) -> {
-                                if (PatternHelper.matchPatterns(k, patterns)) {
-                                    MDC.put(k, v);
-                                }
-                            });
-                        }
-                    } finally {
-                        runnable.run();
-                    }
-                };
-            }
-        }
-
-        return answer;
-    }
-
     protected abstract class MulticastTask implements Runnable, Rejectable {
 
         final Exchange original;
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
index cfdd5d6b5db..bc9d4b2de38 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
@@ -18,7 +18,6 @@ package org.apache.camel.processor;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.AsyncCallback;
@@ -42,6 +41,7 @@ import org.apache.camel.support.service.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.camel.processor.ProcessorHelper.prepareMDCParallelTask;
 import static org.apache.camel.util.ObjectHelper.notNull;
 
 /**
@@ -162,7 +162,7 @@ public class OnCompletionProcessor extends 
AsyncProcessorSupport implements Trac
      * @param processor the processor
      * @param exchange  the exchange
      */
-    protected static void doProcess(Processor processor, Exchange exchange) {
+    protected void doProcess(Processor processor, Exchange exchange) {
         // must remember some properties which we cannot use during 
onCompletion processing
         // as otherwise we may cause issues
         // but keep the caused exception stored as a property 
(Exchange.EXCEPTION_CAUGHT) on the exchange
@@ -299,13 +299,12 @@ public class OnCompletionProcessor extends 
AsyncProcessorSupport implements Trac
             final Exchange copy = prepareExchange(exchange);
 
             if (executorService != null) {
-                executorService.submit(new Callable<Exchange>() {
-                    public Exchange call() throws Exception {
-                        LOG.debug("Processing onComplete: {}", copy);
-                        doProcess(processor, copy);
-                        return copy;
-                    }
-                });
+                Runnable task = () -> {
+                    LOG.debug("Processing onComplete: {}", copy);
+                    doProcess(processor, copy);
+                };
+                task = prepareMDCParallelTask(camelContext, task);
+                executorService.submit(task);
             } else {
                 // run without thread-pool
                 LOG.debug("Processing onComplete: {}", copy);
@@ -329,15 +328,14 @@ public class OnCompletionProcessor extends 
AsyncProcessorSupport implements Trac
             }
 
             if (executorService != null) {
-                executorService.submit(new Callable<Exchange>() {
-                    public Exchange call() throws Exception {
-                        LOG.debug("Processing onFailure: {}", copy);
-                        doProcess(processor, copy);
-                        // restore exception after processing
-                        copy.setException(original);
-                        return null;
-                    }
-                });
+                Runnable task = () -> {
+                    LOG.debug("Processing onFailure: {}", copy);
+                    doProcess(processor, copy);
+                    // restore exception after processing
+                    copy.setException(original);
+                };
+                task = prepareMDCParallelTask(camelContext, task);
+                executorService.submit(task);
             } else {
                 // run without thread-pool
                 LOG.debug("Processing onFailure: {}", copy);
@@ -460,13 +458,12 @@ public class OnCompletionProcessor extends 
AsyncProcessorSupport implements Trac
                     final Exchange copy = prepareExchange(exchange);
 
                     if (executorService != null) {
-                        executorService.submit(new Callable<Exchange>() {
-                            public Exchange call() throws Exception {
-                                LOG.debug("Processing onAfterRoute: {}", copy);
-                                doProcess(processor, copy);
-                                return copy;
-                            }
-                        });
+                        Runnable task = () -> {
+                            LOG.debug("Processing onAfterRoute: {}", copy);
+                            doProcess(processor, copy);
+                        };
+                        task = prepareMDCParallelTask(camelContext, task);
+                        executorService.submit(task);
                     } else {
                         // run without thread-pool
                         LOG.debug("Processing onAfterRoute: {}", copy);
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/ProcessorHelper.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/ProcessorHelper.java
index c7498df3a45..45e57e53fca 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/ProcessorHelper.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/ProcessorHelper.java
@@ -17,12 +17,16 @@
 
 package org.apache.camel.processor;
 
+import java.util.Map;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.spi.NormalizedEndpointUri;
+import org.apache.camel.support.PatternHelper;
+import org.slf4j.MDC;
 
 final class ProcessorHelper {
 
@@ -70,4 +74,35 @@ final class ProcessorHelper {
         }
         return null;
     }
+
+    static Runnable prepareMDCParallelTask(CamelContext camelContext, Runnable 
runnable) {
+        Runnable answer = runnable;
+
+        // if MDC is enabled we need to propagate the information
+        // to the sub task which is executed on another thread from the thread 
pool
+        if (camelContext.isUseMDCLogging()) {
+            String pattern = camelContext.getMDCLoggingKeysPattern();
+            Map<String, String> mdc = MDC.getCopyOfContextMap();
+            if (mdc != null && !mdc.isEmpty()) {
+                answer = () -> {
+                    try {
+                        if (pattern == null || "*".equals(pattern)) {
+                            mdc.forEach(MDC::put);
+                        } else {
+                            final String[] patterns = pattern.split(",");
+                            mdc.forEach((k, v) -> {
+                                if (PatternHelper.matchPatterns(k, patterns)) {
+                                    MDC.put(k, v);
+                                }
+                            });
+                        }
+                    } finally {
+                        runnable.run();
+                    }
+                };
+            }
+        }
+
+        return answer;
+    }
 }
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index ee533fb4c47..52db18bb17e 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -201,7 +201,8 @@ public class WireTapProcessor extends AsyncProcessorSupport
         // send the exchange to the destination using an executor service
         try {
             // create task which has state used during routing
-            PooledExchangeTask task = taskFactory.acquire(target, null);
+            Runnable task = taskFactory.acquire(target, null);
+            task = ProcessorHelper.prepareMDCParallelTask(camelContext, task);
             executorService.submit(task);
         } catch (Exception e) {
             // in case the thread pool rejects or cannot submit the task then 
we need to catch
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/MDCOnCompletionParallelTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/MDCOnCompletionParallelTest.java
new file mode 100644
index 00000000000..657b00524fc
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/MDCOnCompletionParallelTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Test;
+import org.slf4j.MDC;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class MDCOnCompletionParallelTest extends ContextTestSupport {
+
+    @Test
+    public void testMDC() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:end");
+        mock.expectedMessageCount(1);
+
+        template.sendBody("direct:a", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                // enable MDC and breadcrumb
+                context.setUseMDCLogging(true);
+                context.setUseBreadcrumb(true);
+                context.setMDCLoggingKeysPattern("custom*,my*");
+
+                MdcCheckerProcessor checker = new MdcCheckerProcessor();
+
+                from("direct:a").routeId("route-async")
+                        
.onCompletion().parallelProcessing().process(checker).to("mock:end").end()
+                        .process(e -> {
+                            // custom is propagated
+                            MDC.put("custom.hello", "World");
+                            // foo is not propagated
+                            MDC.put("foo", "Bar");
+                            // myKey is propagated
+                            MDC.put("myKey", "Baz");
+                        })
+                        .process(checker)
+                        .to("mock:result");
+
+            }
+        };
+    }
+
+    /**
+     * Stores values from the first invocation to compare them with the second 
invocation later.
+     */
+    private static class MdcCheckerProcessor implements Processor {
+
+        private String exchangeId;
+        private String messageId;
+        private String breadcrumbId;
+        private String contextId;
+        private Long threadId;
+        private String foo;
+
+        @Override
+        public void process(Exchange exchange) {
+            // custom is propagated as its pattern matches
+            assertEquals("World", MDC.get("custom.hello"));
+            assertEquals("Baz", MDC.get("myKey"));
+
+            if (foo != null) {
+                // foo is not propagated
+                assertNotEquals(foo, MDC.get("foo"));
+            } else {
+                foo = MDC.get("foo");
+            }
+
+            if (threadId != null) {
+                long currId = Thread.currentThread().getId();
+                assertNotEquals(threadId, (Object) currId);
+            } else {
+                threadId = Thread.currentThread().getId();
+            }
+
+            String routeId = "route-async";
+            if (routeId != null) {
+                assertEquals(routeId, MDC.get("camel.routeId"));
+            }
+
+            if (exchangeId != null) {
+                assertNotEquals(exchangeId, MDC.get("camel.exchangeId"));
+            } else {
+                exchangeId = MDC.get("camel.exchangeId");
+                assertTrue(exchangeId != null && !exchangeId.isEmpty());
+            }
+
+            if (messageId != null) {
+                assertNotEquals(messageId, MDC.get("camel.messageId"));
+            } else {
+                messageId = MDC.get("camel.messageId");
+                assertTrue(messageId != null && !messageId.isEmpty());
+            }
+
+            if (breadcrumbId != null) {
+                assertEquals(breadcrumbId, MDC.get("camel.breadcrumbId"));
+            } else {
+                breadcrumbId = MDC.get("camel.breadcrumbId");
+                assertTrue(breadcrumbId != null && !breadcrumbId.isEmpty());
+            }
+
+            if (contextId != null) {
+                assertEquals(contextId, MDC.get("camel.contextId"));
+            } else {
+                contextId = MDC.get("camel.contextId");
+                assertTrue(contextId != null && !contextId.isEmpty());
+            }
+
+        }
+    }
+
+}
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/MDCWireTapCustomKeysTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/MDCWireTapCustomKeysTest.java
new file mode 100644
index 00000000000..77ac6668210
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/MDCWireTapCustomKeysTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Test;
+import org.slf4j.MDC;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class MDCWireTapCustomKeysTest extends ContextTestSupport {
+
+    @Test
+    public void testMDC() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:end");
+        mock.expectedMessageCount(1);
+
+        template.sendBody("direct:a", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                // enable MDC and breadcrumb
+                context.setUseMDCLogging(true);
+                context.setUseBreadcrumb(true);
+                context.setMDCLoggingKeysPattern("custom*,my*");
+
+                MdcCheckerProcessor checker = new MdcCheckerProcessor();
+
+                from("direct:a").routeId("route-async")
+                        .process(e -> {
+                            // custom is propagated
+                            MDC.put("custom.hello", "World");
+                            // foo is not propagated
+                            MDC.put("foo", "Bar");
+                            // myKey is propagated
+                            MDC.put("myKey", "Baz");
+                        })
+                        //.process(checker)
+                        .to("log:foo")
+                        .wireTap("direct:tap")
+                        .to("mock:a");
+
+                from("direct:tap").routeId("tap-route")
+                        .process(checker)
+                        .to("mock:end");
+            }
+        };
+    }
+
+    /**
+     * Stores values from the first invocation to compare them with the second 
invocation later.
+     */
+    private static class MdcCheckerProcessor implements Processor {
+
+        private String exchangeId;
+        private String messageId;
+        private String breadcrumbId;
+        private String contextId;
+        private Long threadId;
+        private String foo;
+
+        @Override
+        public void process(Exchange exchange) {
+            // custom is propagated as its pattern matches
+            assertEquals("World", MDC.get("custom.hello"));
+            assertEquals("Baz", MDC.get("myKey"));
+
+            if (foo != null) {
+                // foo is not propagated
+                assertNotEquals(foo, MDC.get("foo"));
+            } else {
+                foo = MDC.get("foo");
+            }
+
+            if (threadId != null) {
+                long currId = Thread.currentThread().getId();
+                assertNotEquals(threadId, (Object) currId);
+            } else {
+                threadId = Thread.currentThread().getId();
+            }
+
+            String routeId = "tap-route";
+            if (routeId != null) {
+                assertEquals(routeId, MDC.get("camel.routeId"));
+            }
+
+            if (exchangeId != null) {
+                assertNotEquals(exchangeId, MDC.get("camel.exchangeId"));
+            } else {
+                exchangeId = MDC.get("camel.exchangeId");
+                assertTrue(exchangeId != null && !exchangeId.isEmpty());
+            }
+
+            if (messageId != null) {
+                assertNotEquals(messageId, MDC.get("camel.messageId"));
+            } else {
+                messageId = MDC.get("camel.messageId");
+                assertTrue(messageId != null && !messageId.isEmpty());
+            }
+
+            if (breadcrumbId != null) {
+                assertEquals(breadcrumbId, MDC.get("camel.breadcrumbId"));
+            } else {
+                breadcrumbId = MDC.get("camel.breadcrumbId");
+                assertTrue(breadcrumbId != null && !breadcrumbId.isEmpty());
+            }
+
+            if (contextId != null) {
+                assertEquals(contextId, MDC.get("camel.contextId"));
+            } else {
+                contextId = MDC.get("camel.contextId");
+                assertTrue(contextId != null && !contextId.isEmpty());
+            }
+
+        }
+    }
+
+}
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/MDCWireTapTest.java 
b/core/camel-core/src/test/java/org/apache/camel/processor/MDCWireTapTest.java
index 88f95f35e9f..8abff997274 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/MDCWireTapTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/MDCWireTapTest.java
@@ -24,7 +24,6 @@ import org.junit.jupiter.api.Test;
 import org.slf4j.MDC;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
 
 public class MDCWireTapTest extends ContextTestSupport {
 
@@ -64,8 +63,7 @@ public class MDCWireTapTest extends ContextTestSupport {
                     public void process(Exchange exchange) {
                         assertEquals("route-b", MDC.get("camel.routeId"));
                         assertEquals(exchange.getExchangeId(), 
MDC.get("camel.exchangeId"));
-                        // custom MDC is not propagated
-                        assertNull(MDC.get("custom.id"));
+                        assertEquals("1", MDC.get("custom.id"));
                     }
                 }).to("log:b-done").to("mock:b");
             }
diff --git 
a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_11.adoc 
b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_11.adoc
index 7d81cf99610..5d0b7ec757d 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_11.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_11.adoc
@@ -6,7 +6,9 @@ from both 4.0 to 4.1 and 4.1 to 4.2.
 
 == Upgrading Camel 4.10 to 4.11
 
-=== Recipient List, Split and Multicast EIP
+=== EIPs
+
+==== Recipient List, Split and Multicast EIP
 
 In parallel processing mode, you can also enable `synchronous=true` to force 
these EIPs to process
 the sub-tasks using the upper bounds of the thread-pool. If using 
`synchronous=false` then Camel
@@ -15,6 +17,12 @@ due to sub-tasks using other thread-pools such as 
`CompletableFuture.runAsync` o
 
 Setting `synchronous=true` is the same behaviour is in Camel 2 which did not 
have the reactive routing engine.
 
+==== WireTap and OnCompletion EIP
+
+When MDC is enabled, then the WireTap and OnCompletion (in parallel mode) will 
now propagate MDC
+context when creating threads to process the exchanges. This makes these EIPs 
similar to how
+other EIPs such as Multicast EIP already does this.
+
 === camel-api
 
 Added `removeTraits` method to `org.apache.camel.Message`.

Reply via email to