This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch mdc in repository https://gitbox.apache.org/repos/asf/camel.git
commit 1be229533cd1762ae8f4d06eddfc195264daffd6 Author: Claus Ibsen <[email protected]> AuthorDate: Fri Dec 5 21:38:58 2025 +0100 CAMEL-22717: camel-mdc - MDC is lost when using interceptSendToEndpoint (and potentially other situations) --- .../org/apache/camel/mdc/MDCEventNotifier.java | 45 +++++++++ .../camel/mdc/MDCInterceptEndpointFactory.java | 56 +++++++++++ .../java/org/apache/camel/mdc/MDCProcessor.java | 27 ++++++ .../camel/mdc/MDCProcessorsInterceptStrategy.java | 32 ++++--- .../main/java/org/apache/camel/mdc/MDCService.java | 20 +++- .../camel/mdc/MDCInterceptToEndpointBeanTest.java | 76 +++++++++++++++ .../camel-mdc/src/test/resources/log4j2.properties | 2 +- .../processor/MDCInterceptToEndpointBeanTest.java | 68 +++++++++++++ .../processor/MDCInterceptToEndpointTest.java | 105 +++++++++++++++++++++ .../src/test/resources/log4j2.properties | 6 +- 10 files changed, 418 insertions(+), 19 deletions(-) diff --git a/components/camel-mdc/src/main/java/org/apache/camel/mdc/MDCEventNotifier.java b/components/camel-mdc/src/main/java/org/apache/camel/mdc/MDCEventNotifier.java new file mode 100644 index 000000000000..b532f8281fb6 --- /dev/null +++ b/components/camel-mdc/src/main/java/org/apache/camel/mdc/MDCEventNotifier.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.mdc; + +import org.apache.camel.spi.CamelEvent; +import org.apache.camel.support.SimpleEventNotifierSupport; + +class MDCEventNotifier extends SimpleEventNotifierSupport { + + private final MDCService mdc; + + public MDCEventNotifier(MDCService mdc) { + // ignore these + setIgnoreCamelContextEvents(true); + setIgnoreCamelContextInitEvents(true); + setIgnoreRouteEvents(true); + setIgnoreServiceEvents(true); + setIgnoreStepEvents(true); + // we need also async processing started events + setIgnoreExchangeAsyncProcessingStartedEvents(false); + this.mdc = mdc; + } + + @Override + public void notify(CamelEvent event) throws Exception { + if (event instanceof CamelEvent.ExchangeAsyncProcessingStartedEvent eap) { + // exchange is continued processed on another thread so unset MDC + mdc.unsetMDC(eap.getExchange()); + } + } +} diff --git a/components/camel-mdc/src/main/java/org/apache/camel/mdc/MDCInterceptEndpointFactory.java b/components/camel-mdc/src/main/java/org/apache/camel/mdc/MDCInterceptEndpointFactory.java new file mode 100644 index 000000000000..682d7023ddd4 --- /dev/null +++ b/components/camel-mdc/src/main/java/org/apache/camel/mdc/MDCInterceptEndpointFactory.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.mdc; + +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.Predicate; +import org.apache.camel.Processor; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.spi.InterceptEndpointFactory; +import org.apache.camel.spi.InterceptSendToEndpoint; +import org.apache.camel.spi.InterceptStrategy; +import org.apache.camel.support.DefaultInterceptSendToEndpoint; + +@Deprecated +public class MDCInterceptEndpointFactory implements InterceptEndpointFactory { + + private final InterceptStrategy interceptStrategy; + + public MDCInterceptEndpointFactory(InterceptStrategy interceptStrategy) { + this.interceptStrategy = interceptStrategy; + } + + @Override + public InterceptSendToEndpoint createInterceptSendToEndpoint(CamelContext camelContext, Endpoint endpoint, + boolean skip, Predicate onWhen, + Processor before, Processor after) { + DefaultInterceptSendToEndpoint answer = new DefaultInterceptSendToEndpoint(endpoint, skip); + answer.setOnWhen(onWhen); + // before should be wrapped + if (before != null) { + try { + before = interceptStrategy.wrapProcessorInInterceptors(camelContext, null, before, null); + } catch (Exception e) { + throw RuntimeCamelException.wrapRuntimeException(e); + } + } + answer.setBefore(before); + answer.setAfter(after); + return answer; + } +} diff --git a/components/camel-mdc/src/main/java/org/apache/camel/mdc/MDCProcessor.java b/components/camel-mdc/src/main/java/org/apache/camel/mdc/MDCProcessor.java new file mode 100644 index 000000000000..27bf8f9498f3 --- /dev/null +++ b/components/camel-mdc/src/main/java/org/apache/camel/mdc/MDCProcessor.java @@ -0,0 +1,27 @@ +package org.apache.camel.mdc; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.support.processor.DelegateProcessor; + +@Deprecated +public class MDCProcessor extends DelegateProcessor { + + private final MDCService mdc; + + public MDCProcessor(MDCService mdc, Processor processor) { + super(processor); + this.mdc = mdc; + } + + @Override + public void process(Exchange exchange) throws Exception { + mdc.setMDC(exchange); + try { + super.process(exchange); + } finally { +// mdc.unsetMDC(exchange); + } + } + +} diff --git a/components/camel-mdc/src/main/java/org/apache/camel/mdc/MDCProcessorsInterceptStrategy.java b/components/camel-mdc/src/main/java/org/apache/camel/mdc/MDCProcessorsInterceptStrategy.java index e48d7bda9cd2..b6e7893826f4 100644 --- a/components/camel-mdc/src/main/java/org/apache/camel/mdc/MDCProcessorsInterceptStrategy.java +++ b/components/camel-mdc/src/main/java/org/apache/camel/mdc/MDCProcessorsInterceptStrategy.java @@ -26,6 +26,8 @@ import org.apache.camel.NamedNode; import org.apache.camel.Processor; import org.apache.camel.spi.InterceptStrategy; import org.apache.camel.support.AsyncProcessorConverterHelper; +import org.apache.camel.support.AsyncProcessorSupport; +import org.apache.camel.support.service.ServiceHelper; /** * MDCProcessorsInterceptStrategy is used to wrap each processor calls and generate the MDC context for each process @@ -49,26 +51,18 @@ public class MDCProcessorsInterceptStrategy implements InterceptStrategy { final AsyncProcessor asyncProcessor = AsyncProcessorConverterHelper.convert(target); - return new AsyncProcessor() { + return new AsyncProcessorSupport() { @Override public boolean process(Exchange exchange, AsyncCallback callback) { mdcService.setMDC(exchange); - boolean answer = asyncProcessor.process(exchange, doneSync -> { - callback.done(doneSync); - }); - mdcService.unsetMDC(exchange); - return answer; + return asyncProcessor.process(exchange, callback); } @Override public void process(Exchange exchange) throws Exception { mdcService.setMDC(exchange); - try { - asyncProcessor.process(exchange); - } finally { - mdcService.unsetMDC(exchange); - } + asyncProcessor.process(exchange); } @Override @@ -82,9 +76,23 @@ public class MDCProcessorsInterceptStrategy implements InterceptStrategy { future.complete(exchange); } }); - mdcService.unsetMDC(exchange); return future; } + + @Override + protected void doInit() throws Exception { + ServiceHelper.initService(target); + } + + @Override + protected void doStart() throws Exception { + ServiceHelper.startService(target); + } + + @Override + protected void doStop() throws Exception { + ServiceHelper.stopService(target); + } }; } diff --git a/components/camel-mdc/src/main/java/org/apache/camel/mdc/MDCService.java b/components/camel-mdc/src/main/java/org/apache/camel/mdc/MDCService.java index 89959a042e77..41de922d53b1 100644 --- a/components/camel-mdc/src/main/java/org/apache/camel/mdc/MDCService.java +++ b/components/camel-mdc/src/main/java/org/apache/camel/mdc/MDCService.java @@ -25,6 +25,7 @@ import org.apache.camel.spi.CamelMDCService; import org.apache.camel.spi.InterceptStrategy; import org.apache.camel.spi.LogListener; import org.apache.camel.spi.annotations.JdkService; +import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.support.service.ServiceSupport; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; @@ -43,6 +44,7 @@ public class MDCService extends ServiceSupport implements CamelMDCService { static String MDC_CAMEL_CONTEXT_ID = "camel.contextId"; private static final Logger LOG = LoggerFactory.getLogger(MDCService.class); + private final MDCEventNotifier eventNotifier = new MDCEventNotifier(this); private CamelContext camelContext; @@ -92,17 +94,28 @@ public class MDCService extends ServiceSupport implements CamelMDCService { @Override public void doInit() { ObjectHelper.notNull(camelContext, "CamelContext", this); - camelContext.getCamelContextExtension().addLogListener(new MDCLogListener()); +// camelContext.getCamelContextExtension().addLogListener(new MDCLogListener()); InterceptStrategy interceptStrategy = new MDCProcessorsInterceptStrategy(this); camelContext.getCamelContextExtension().addInterceptStrategy(interceptStrategy); +// camelContext.getCamelContextExtension().addContextPlugin(InterceptEndpointFactory.class, new MDCInterceptEndpointFactory(interceptStrategy)); + camelContext.getManagementStrategy().addEventNotifier(eventNotifier); + } @Override protected void doStart() throws Exception { super.doStart(); + ServiceHelper.startService(eventNotifier); LOG.info("Mapped Diagnostic Context (MDC) enabled"); } + @Override + protected void doShutdown() throws Exception { + super.doShutdown(); + camelContext.getManagementStrategy().removeEventNotifier(eventNotifier); + ServiceHelper.stopService(eventNotifier); + } + private void setOrUnsetMDC(Exchange exchange, boolean push) { try { // Default values @@ -135,17 +148,18 @@ public class MDCService extends ServiceSupport implements CamelMDCService { setOrUnsetMDC(exchange, false); } + @Deprecated private final class MDCLogListener implements LogListener { @Override public String onLog(Exchange exchange, CamelLogger camelLogger, String message) { - setMDC(exchange); +// setMDC(exchange); return message; } @Override public void afterLog(Exchange exchange, CamelLogger camelLogger, String message) { - unsetMDC(exchange); +// unsetMDC(exchange); } } diff --git a/components/camel-mdc/src/test/java/org/apache/camel/mdc/MDCInterceptToEndpointBeanTest.java b/components/camel-mdc/src/test/java/org/apache/camel/mdc/MDCInterceptToEndpointBeanTest.java new file mode 100644 index 000000000000..fd218f04e24b --- /dev/null +++ b/components/camel-mdc/src/test/java/org/apache/camel/mdc/MDCInterceptToEndpointBeanTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.mdc; + +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Exchange; +import org.apache.camel.LoggingLevel; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +public class MDCInterceptToEndpointBeanTest extends CamelTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger("MyRoute.myBean"); + + private Processor myBean = new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + LOG.info("MDC Values lost"); + } + }; + + @Override + protected CamelContext createCamelContext() throws Exception { + MDCService mdcSvc = new MDCService(); + CamelContext context = super.createCamelContext(); + CamelContextAware.trySetCamelContext(mdcSvc, context); + mdcSvc.init(context); + return context; + } + + @Test + public void testMDC() throws Exception { + template.sendBody("direct:start", "Hello World"); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + context.getRegistry().bind("myMapper", myBean); + + interceptSendToEndpoint("bean*").to("direct:beforeSend"); + + from("direct:start") + .log(LoggingLevel.INFO, "MyRoute.logBefore", "MDC Values present") + .to("bean:myMapper") + .log(LoggingLevel.INFO, "MyRoute.logAfter", "MDC Values present"); + + from("direct:beforeSend") + .log(LoggingLevel.INFO, "MyRoute.beforeSend", "MDC Values present"); + + } + }; + } +} diff --git a/components/camel-mdc/src/test/resources/log4j2.properties b/components/camel-mdc/src/test/resources/log4j2.properties index 731b4840a105..52ea10b332cd 100644 --- a/components/camel-mdc/src/test/resources/log4j2.properties +++ b/components/camel-mdc/src/test/resources/log4j2.properties @@ -27,4 +27,4 @@ appender.file.layout.type = PatternLayout appender.file.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m [%X{camel.threadId}, %X{camel.contextId}, %X{camel.routeId}, %X{camel.exchangeId}, %X{camel.messageId}, %X{head1}, %X{prop1}, %X{head2}, %X{prop2}]%n rootLogger.level = INFO -rootLogger.appenderRef.file.ref = file +rootLogger.appenderRef.file.ref = console diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MDCInterceptToEndpointBeanTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MDCInterceptToEndpointBeanTest.java new file mode 100644 index 000000000000..6d151fc9b8b1 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/MDCInterceptToEndpointBeanTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.BindToRegistry; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.LoggingLevel; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MDCInterceptToEndpointBeanTest extends ContextTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(MDCInterceptToEndpointBeanTest.class); + + private Processor myBean = new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + LOG.info("MDC Values lost"); + } + }; + + @Test + public void testMDC() throws Exception { + template.sendBody("direct:start", "Hello World"); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + // enable MDC + context.setUseMDCLogging(true); + + context.getRegistry().bind("myMapper", myBean); + + interceptSendToEndpoint("bean*").to("direct:beforeSend"); + + from("direct:start") + .log(LoggingLevel.INFO, "MyRoute.logBefore", "MDC Values present") + .to("bean:myMapper") + .log(LoggingLevel.INFO, "MyRoute.logAfter", "MDC Values present"); + + from("direct:beforeSend") + .log(LoggingLevel.INFO, "MyRoute.beforeSend", "MDC Values present"); + + } + }; + } +} diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MDCInterceptToEndpointTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MDCInterceptToEndpointTest.java new file mode 100644 index 000000000000..0042cc7a5d95 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/MDCInterceptToEndpointTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.jupiter.api.Test; +import org.slf4j.MDC; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class MDCInterceptToEndpointTest extends ContextTestSupport { + + @Test + public void testMDC() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Hello World"); + + template.sendBody("direct:a", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + @Test + public void testMDCTwoMessages() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Hello World", "Bye World"); + + template.sendBody("direct:a", "Hello World"); + template.sendBody("direct:a", "Bye World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + // enable MDC + context.setUseMDCLogging(true); + + interceptSendToEndpoint("direct:cheese").process(new Processor() { + public void process(Exchange exchange) { + assertEquals("route-a", MDC.get("camel.routeId")); + assertEquals(exchange.getExchangeId(), MDC.get("camel.exchangeId")); + assertEquals(exchange.getIn().getMessageId(), MDC.get("camel.messageId")); + assertEquals("step-a", MDC.get("camel.stepId")); + MDC.put("custom.id", "1"); + } + }); + + from("direct:a").routeId("route-a").step("step-a").process(new Processor() { + public void process(Exchange exchange) { + assertEquals("route-a", MDC.get("camel.routeId")); + assertEquals(exchange.getExchangeId(), MDC.get("camel.exchangeId")); + assertEquals(exchange.getIn().getMessageId(), MDC.get("camel.messageId")); + assertEquals("step-a", MDC.get("camel.stepId")); + MDC.put("custom.id", "1"); + } + }).to("log:foo") + .to("direct:cheese") + .to("direct:b"); + + from("direct:b").routeId("route-b").step("step-b").process(new Processor() { + public void process(Exchange exchange) { + assertEquals("route-b", MDC.get("camel.routeId")); + assertEquals(exchange.getExchangeId(), MDC.get("camel.exchangeId")); + assertEquals(exchange.getIn().getMessageId(), MDC.get("camel.messageId")); + assertEquals("step-b", MDC.get("camel.stepId")); + assertEquals("1", MDC.get("custom.id")); + } + }).to("log:bar").to("mock:result"); + + from("direct:cheese").routeId("route-cheese").step("step-cheese").process(new Processor() { + public void process(Exchange exchange) { + assertEquals("route-cheese", MDC.get("camel.routeId")); + assertEquals(exchange.getExchangeId(), MDC.get("camel.exchangeId")); + assertEquals(exchange.getIn().getMessageId(), MDC.get("camel.messageId")); + assertEquals("step-cheese", MDC.get("camel.stepId")); + assertEquals("1", MDC.get("custom.id")); + } + }); + + } + }; + } +} diff --git a/core/camel-core/src/test/resources/log4j2.properties b/core/camel-core/src/test/resources/log4j2.properties index eda60d476500..b63f2073e766 100644 --- a/core/camel-core/src/test/resources/log4j2.properties +++ b/core/camel-core/src/test/resources/log4j2.properties @@ -17,10 +17,10 @@ appender.console.type = Console appender.console.name = console appender.console.layout.type = PatternLayout -appender.console.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n +## appender.console.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n ## MDC layout -### appender.console.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %X{camel.exchangeId} %X{camel.breadcrumbId} - %m%n +appender.console.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %X{camel.exchangeId} %X{camel.breadcrumbId} - %m%n appender.file.type = File appender.file.name = file @@ -46,7 +46,7 @@ logger.file-cluster.level = INFO rootLogger.level = INFO rootLogger.appenderRef.file.ref = file -#rootLogger.appenderRef.console.ref = console +rootLogger.appenderRef.console.ref = console #logger.camel-core.name = org.apache.camel #logger.camel-core.level = INFO
