This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new b8af4cea1b3 CAMEL-21823: camel-core - Propagate MDC custom keys for
WireTap and OnCompletion EIPs (#17338)
b8af4cea1b3 is described below
commit b8af4cea1b3eae720ae920697f55fba4a02d6d9b
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Mar 3 19:47:38 2025 +0000
CAMEL-21823: camel-core - Propagate MDC custom keys for WireTap and
OnCompletion EIPs (#17338)
---
.../apache/camel/processor/MulticastProcessor.java | 35 +----
.../camel/processor/OnCompletionProcessor.java | 47 ++++---
.../apache/camel/processor/ProcessorHelper.java | 35 +++++
.../apache/camel/processor/WireTapProcessor.java | 3 +-
.../processor/MDCOnCompletionParallelTest.java | 140 ++++++++++++++++++++
.../camel/processor/MDCWireTapCustomKeysTest.java | 144 +++++++++++++++++++++
.../org/apache/camel/processor/MDCWireTapTest.java | 4 +-
.../ROOT/pages/camel-4x-upgrade-guide-4_11.adoc | 10 +-
8 files changed, 355 insertions(+), 63 deletions(-)
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 7ec13021502..0b3e6e8797d 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -68,7 +68,6 @@ import org.apache.camel.support.DefaultExchange;
import org.apache.camel.support.EventHelper;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.LRUCacheFactory;
-import org.apache.camel.support.PatternHelper;
import org.apache.camel.support.PluginHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.CastUtils;
@@ -80,6 +79,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
+import static
org.apache.camel.processor.ProcessorHelper.prepareMDCParallelTask;
import static org.apache.camel.util.ObjectHelper.notNull;
/**
@@ -383,7 +383,7 @@ public class MulticastProcessor extends
AsyncProcessorSupport
protected void schedule(final Runnable runnable, boolean sync) {
if (isParallelProcessing()) {
- Runnable task = prepareParallelTask(runnable);
+ Runnable task = prepareMDCParallelTask(camelContext, runnable);
try {
executorService.submit(() ->
reactiveExecutor.scheduleSync(task));
} catch (RejectedExecutionException e) {
@@ -398,37 +398,6 @@ public class MulticastProcessor extends
AsyncProcessorSupport
}
}
- private Runnable prepareParallelTask(Runnable runnable) {
- Runnable answer = runnable;
-
- // if MDC is enabled we need to propagate the information
- // to the sub task which is executed on another thread from the thread
pool
- if (camelContext.isUseMDCLogging()) {
- String pattern = camelContext.getMDCLoggingKeysPattern();
- Map<String, String> mdc = MDC.getCopyOfContextMap();
- if (mdc != null && !mdc.isEmpty()) {
- answer = () -> {
- try {
- if (pattern == null || "*".equals(pattern)) {
- mdc.forEach(MDC::put);
- } else {
- final String[] patterns = pattern.split(",");
- mdc.forEach((k, v) -> {
- if (PatternHelper.matchPatterns(k, patterns)) {
- MDC.put(k, v);
- }
- });
- }
- } finally {
- runnable.run();
- }
- };
- }
- }
-
- return answer;
- }
-
protected abstract class MulticastTask implements Runnable, Rejectable {
final Exchange original;
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
index cfdd5d6b5db..bc9d4b2de38 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
@@ -18,7 +18,6 @@ package org.apache.camel.processor;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import org.apache.camel.AsyncCallback;
@@ -42,6 +41,7 @@ import org.apache.camel.support.service.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static
org.apache.camel.processor.ProcessorHelper.prepareMDCParallelTask;
import static org.apache.camel.util.ObjectHelper.notNull;
/**
@@ -162,7 +162,7 @@ public class OnCompletionProcessor extends
AsyncProcessorSupport implements Trac
* @param processor the processor
* @param exchange the exchange
*/
- protected static void doProcess(Processor processor, Exchange exchange) {
+ protected void doProcess(Processor processor, Exchange exchange) {
// must remember some properties which we cannot use during
onCompletion processing
// as otherwise we may cause issues
// but keep the caused exception stored as a property
(Exchange.EXCEPTION_CAUGHT) on the exchange
@@ -299,13 +299,12 @@ public class OnCompletionProcessor extends
AsyncProcessorSupport implements Trac
final Exchange copy = prepareExchange(exchange);
if (executorService != null) {
- executorService.submit(new Callable<Exchange>() {
- public Exchange call() throws Exception {
- LOG.debug("Processing onComplete: {}", copy);
- doProcess(processor, copy);
- return copy;
- }
- });
+ Runnable task = () -> {
+ LOG.debug("Processing onComplete: {}", copy);
+ doProcess(processor, copy);
+ };
+ task = prepareMDCParallelTask(camelContext, task);
+ executorService.submit(task);
} else {
// run without thread-pool
LOG.debug("Processing onComplete: {}", copy);
@@ -329,15 +328,14 @@ public class OnCompletionProcessor extends
AsyncProcessorSupport implements Trac
}
if (executorService != null) {
- executorService.submit(new Callable<Exchange>() {
- public Exchange call() throws Exception {
- LOG.debug("Processing onFailure: {}", copy);
- doProcess(processor, copy);
- // restore exception after processing
- copy.setException(original);
- return null;
- }
- });
+ Runnable task = () -> {
+ LOG.debug("Processing onFailure: {}", copy);
+ doProcess(processor, copy);
+ // restore exception after processing
+ copy.setException(original);
+ };
+ task = prepareMDCParallelTask(camelContext, task);
+ executorService.submit(task);
} else {
// run without thread-pool
LOG.debug("Processing onFailure: {}", copy);
@@ -460,13 +458,12 @@ public class OnCompletionProcessor extends
AsyncProcessorSupport implements Trac
final Exchange copy = prepareExchange(exchange);
if (executorService != null) {
- executorService.submit(new Callable<Exchange>() {
- public Exchange call() throws Exception {
- LOG.debug("Processing onAfterRoute: {}", copy);
- doProcess(processor, copy);
- return copy;
- }
- });
+ Runnable task = () -> {
+ LOG.debug("Processing onAfterRoute: {}", copy);
+ doProcess(processor, copy);
+ };
+ task = prepareMDCParallelTask(camelContext, task);
+ executorService.submit(task);
} else {
// run without thread-pool
LOG.debug("Processing onAfterRoute: {}", copy);
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/ProcessorHelper.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/ProcessorHelper.java
index c7498df3a45..45e57e53fca 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/ProcessorHelper.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/ProcessorHelper.java
@@ -17,12 +17,16 @@
package org.apache.camel.processor;
+import java.util.Map;
+
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.spi.NormalizedEndpointUri;
+import org.apache.camel.support.PatternHelper;
+import org.slf4j.MDC;
final class ProcessorHelper {
@@ -70,4 +74,35 @@ final class ProcessorHelper {
}
return null;
}
+
+ static Runnable prepareMDCParallelTask(CamelContext camelContext, Runnable
runnable) {
+ Runnable answer = runnable;
+
+ // if MDC is enabled we need to propagate the information
+ // to the sub task which is executed on another thread from the thread
pool
+ if (camelContext.isUseMDCLogging()) {
+ String pattern = camelContext.getMDCLoggingKeysPattern();
+ Map<String, String> mdc = MDC.getCopyOfContextMap();
+ if (mdc != null && !mdc.isEmpty()) {
+ answer = () -> {
+ try {
+ if (pattern == null || "*".equals(pattern)) {
+ mdc.forEach(MDC::put);
+ } else {
+ final String[] patterns = pattern.split(",");
+ mdc.forEach((k, v) -> {
+ if (PatternHelper.matchPatterns(k, patterns)) {
+ MDC.put(k, v);
+ }
+ });
+ }
+ } finally {
+ runnable.run();
+ }
+ };
+ }
+ }
+
+ return answer;
+ }
}
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index ee533fb4c47..52db18bb17e 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -201,7 +201,8 @@ public class WireTapProcessor extends AsyncProcessorSupport
// send the exchange to the destination using an executor service
try {
// create task which has state used during routing
- PooledExchangeTask task = taskFactory.acquire(target, null);
+ Runnable task = taskFactory.acquire(target, null);
+ task = ProcessorHelper.prepareMDCParallelTask(camelContext, task);
executorService.submit(task);
} catch (Exception e) {
// in case the thread pool rejects or cannot submit the task then
we need to catch
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/MDCOnCompletionParallelTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/MDCOnCompletionParallelTest.java
new file mode 100644
index 00000000000..657b00524fc
--- /dev/null
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/MDCOnCompletionParallelTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Test;
+import org.slf4j.MDC;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class MDCOnCompletionParallelTest extends ContextTestSupport {
+
+ @Test
+ public void testMDC() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:end");
+ mock.expectedMessageCount(1);
+
+ template.sendBody("direct:a", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ // enable MDC and breadcrumb
+ context.setUseMDCLogging(true);
+ context.setUseBreadcrumb(true);
+ context.setMDCLoggingKeysPattern("custom*,my*");
+
+ MdcCheckerProcessor checker = new MdcCheckerProcessor();
+
+ from("direct:a").routeId("route-async")
+
.onCompletion().parallelProcessing().process(checker).to("mock:end").end()
+ .process(e -> {
+ // custom is propagated
+ MDC.put("custom.hello", "World");
+ // foo is not propagated
+ MDC.put("foo", "Bar");
+ // myKey is propagated
+ MDC.put("myKey", "Baz");
+ })
+ .process(checker)
+ .to("mock:result");
+
+ }
+ };
+ }
+
+ /**
+ * Stores values from the first invocation to compare them with the second
invocation later.
+ */
+ private static class MdcCheckerProcessor implements Processor {
+
+ private String exchangeId;
+ private String messageId;
+ private String breadcrumbId;
+ private String contextId;
+ private Long threadId;
+ private String foo;
+
+ @Override
+ public void process(Exchange exchange) {
+ // custom is propagated as its pattern matches
+ assertEquals("World", MDC.get("custom.hello"));
+ assertEquals("Baz", MDC.get("myKey"));
+
+ if (foo != null) {
+ // foo is not propagated
+ assertNotEquals(foo, MDC.get("foo"));
+ } else {
+ foo = MDC.get("foo");
+ }
+
+ if (threadId != null) {
+ long currId = Thread.currentThread().getId();
+ assertNotEquals(threadId, (Object) currId);
+ } else {
+ threadId = Thread.currentThread().getId();
+ }
+
+ String routeId = "route-async";
+ if (routeId != null) {
+ assertEquals(routeId, MDC.get("camel.routeId"));
+ }
+
+ if (exchangeId != null) {
+ assertNotEquals(exchangeId, MDC.get("camel.exchangeId"));
+ } else {
+ exchangeId = MDC.get("camel.exchangeId");
+ assertTrue(exchangeId != null && !exchangeId.isEmpty());
+ }
+
+ if (messageId != null) {
+ assertNotEquals(messageId, MDC.get("camel.messageId"));
+ } else {
+ messageId = MDC.get("camel.messageId");
+ assertTrue(messageId != null && !messageId.isEmpty());
+ }
+
+ if (breadcrumbId != null) {
+ assertEquals(breadcrumbId, MDC.get("camel.breadcrumbId"));
+ } else {
+ breadcrumbId = MDC.get("camel.breadcrumbId");
+ assertTrue(breadcrumbId != null && !breadcrumbId.isEmpty());
+ }
+
+ if (contextId != null) {
+ assertEquals(contextId, MDC.get("camel.contextId"));
+ } else {
+ contextId = MDC.get("camel.contextId");
+ assertTrue(contextId != null && !contextId.isEmpty());
+ }
+
+ }
+ }
+
+}
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/MDCWireTapCustomKeysTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/MDCWireTapCustomKeysTest.java
new file mode 100644
index 00000000000..77ac6668210
--- /dev/null
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/MDCWireTapCustomKeysTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Test;
+import org.slf4j.MDC;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class MDCWireTapCustomKeysTest extends ContextTestSupport {
+
+ @Test
+ public void testMDC() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:end");
+ mock.expectedMessageCount(1);
+
+ template.sendBody("direct:a", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ // enable MDC and breadcrumb
+ context.setUseMDCLogging(true);
+ context.setUseBreadcrumb(true);
+ context.setMDCLoggingKeysPattern("custom*,my*");
+
+ MdcCheckerProcessor checker = new MdcCheckerProcessor();
+
+ from("direct:a").routeId("route-async")
+ .process(e -> {
+ // custom is propagated
+ MDC.put("custom.hello", "World");
+ // foo is not propagated
+ MDC.put("foo", "Bar");
+ // myKey is propagated
+ MDC.put("myKey", "Baz");
+ })
+ //.process(checker)
+ .to("log:foo")
+ .wireTap("direct:tap")
+ .to("mock:a");
+
+ from("direct:tap").routeId("tap-route")
+ .process(checker)
+ .to("mock:end");
+ }
+ };
+ }
+
+ /**
+ * Stores values from the first invocation to compare them with the second
invocation later.
+ */
+ private static class MdcCheckerProcessor implements Processor {
+
+ private String exchangeId;
+ private String messageId;
+ private String breadcrumbId;
+ private String contextId;
+ private Long threadId;
+ private String foo;
+
+ @Override
+ public void process(Exchange exchange) {
+ // custom is propagated as its pattern matches
+ assertEquals("World", MDC.get("custom.hello"));
+ assertEquals("Baz", MDC.get("myKey"));
+
+ if (foo != null) {
+ // foo is not propagated
+ assertNotEquals(foo, MDC.get("foo"));
+ } else {
+ foo = MDC.get("foo");
+ }
+
+ if (threadId != null) {
+ long currId = Thread.currentThread().getId();
+ assertNotEquals(threadId, (Object) currId);
+ } else {
+ threadId = Thread.currentThread().getId();
+ }
+
+ String routeId = "tap-route";
+ if (routeId != null) {
+ assertEquals(routeId, MDC.get("camel.routeId"));
+ }
+
+ if (exchangeId != null) {
+ assertNotEquals(exchangeId, MDC.get("camel.exchangeId"));
+ } else {
+ exchangeId = MDC.get("camel.exchangeId");
+ assertTrue(exchangeId != null && !exchangeId.isEmpty());
+ }
+
+ if (messageId != null) {
+ assertNotEquals(messageId, MDC.get("camel.messageId"));
+ } else {
+ messageId = MDC.get("camel.messageId");
+ assertTrue(messageId != null && !messageId.isEmpty());
+ }
+
+ if (breadcrumbId != null) {
+ assertEquals(breadcrumbId, MDC.get("camel.breadcrumbId"));
+ } else {
+ breadcrumbId = MDC.get("camel.breadcrumbId");
+ assertTrue(breadcrumbId != null && !breadcrumbId.isEmpty());
+ }
+
+ if (contextId != null) {
+ assertEquals(contextId, MDC.get("camel.contextId"));
+ } else {
+ contextId = MDC.get("camel.contextId");
+ assertTrue(contextId != null && !contextId.isEmpty());
+ }
+
+ }
+ }
+
+}
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/MDCWireTapTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/MDCWireTapTest.java
index 88f95f35e9f..8abff997274 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/MDCWireTapTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/MDCWireTapTest.java
@@ -24,7 +24,6 @@ import org.junit.jupiter.api.Test;
import org.slf4j.MDC;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
public class MDCWireTapTest extends ContextTestSupport {
@@ -64,8 +63,7 @@ public class MDCWireTapTest extends ContextTestSupport {
public void process(Exchange exchange) {
assertEquals("route-b", MDC.get("camel.routeId"));
assertEquals(exchange.getExchangeId(),
MDC.get("camel.exchangeId"));
- // custom MDC is not propagated
- assertNull(MDC.get("custom.id"));
+ assertEquals("1", MDC.get("custom.id"));
}
}).to("log:b-done").to("mock:b");
}
diff --git
a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_11.adoc
b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_11.adoc
index 7d81cf99610..5d0b7ec757d 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_11.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_11.adoc
@@ -6,7 +6,9 @@ from both 4.0 to 4.1 and 4.1 to 4.2.
== Upgrading Camel 4.10 to 4.11
-=== Recipient List, Split and Multicast EIP
+=== EIPs
+
+==== Recipient List, Split and Multicast EIP
In parallel processing mode, you can also enable `synchronous=true` to force
these EIPs to process
the sub-tasks using the upper bounds of the thread-pool. If using
`synchronous=false` then Camel
@@ -15,6 +17,12 @@ due to sub-tasks using other thread-pools such as
`CompletableFuture.runAsync` o
Setting `synchronous=true` is the same behaviour is in Camel 2 which did not
have the reactive routing engine.
+==== WireTap and OnCompletion EIP
+
+When MDC is enabled, then the WireTap and OnCompletion (in parallel mode) will
now propagate MDC
+context when creating threads to process the exchanges. This makes these EIPs
similar to how
+other EIPs such as Multicast EIP already does this.
+
=== camel-api
Added `removeTraits` method to `org.apache.camel.Message`.