This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-4.4.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.4.x by this push: new 3554f177d14 CAMEL-20778: intercept EIP should not intercept doTry/doCatch/doFinally. (#14180) 3554f177d14 is described below commit 3554f177d14bdcd7a65a77a60b7c5344b2ef96e2 Author: Denis Istomin <istomin....@gmail.com> AuthorDate: Sat May 18 11:12:24 2024 +0500 CAMEL-20778: intercept EIP should not intercept doTry/doCatch/doFinally. (#14180) Co-authored-by: Claus Ibsen <claus.ib...@gmail.com> --- .../apache/camel/spi/InterceptableProcessor.java | 35 ++++++++++++++ .../apache/camel/impl/engine/DefaultChannel.java | 39 ++++++++------- .../org/apache/camel/processor/CatchProcessor.java | 8 +++- .../apache/camel/processor/FinallyProcessor.java | 9 +++- .../org/apache/camel/processor/TryProcessor.java | 9 +++- .../org/apache/camel/reifier/InterceptReifier.java | 1 + .../intercept/InterceptDoTryCatchTest.java | 56 ++++++++++++++++++++++ 7 files changed, 137 insertions(+), 20 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/InterceptableProcessor.java b/core/camel-api/src/main/java/org/apache/camel/spi/InterceptableProcessor.java new file mode 100644 index 00000000000..a3463272a10 --- /dev/null +++ b/core/camel-api/src/main/java/org/apache/camel/spi/InterceptableProcessor.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +import org.apache.camel.Processor; + +/** + * To control whether a {@link Processor} can be intercepted via {@link InterceptStrategy}. + * + * Some EIPs such as try/catch/finally cannot be intercepted. + */ +public interface InterceptableProcessor { + + /** + * Whether the processor can be intercepted or not. + * + * @return true to allow intercepting, false to skip. + */ + boolean canIntercept(); + +} diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java index 07539ebedc0..61ba5bc3c03 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java @@ -36,6 +36,7 @@ import org.apache.camel.spi.BacklogDebugger; import org.apache.camel.spi.Debugger; import org.apache.camel.spi.ErrorHandlerRedeliveryCustomizer; import org.apache.camel.spi.InterceptStrategy; +import org.apache.camel.spi.InterceptableProcessor; import org.apache.camel.spi.ManagementInterceptStrategy; import org.apache.camel.spi.MessageHistoryFactory; import org.apache.camel.spi.Tracer; @@ -228,24 +229,28 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel { Collections.reverse(interceptors); // wrap the output with the configured interceptors Processor target = nextProcessor; - for (InterceptStrategy strategy : interceptors) { - Processor next = target == nextProcessor ? null : nextProcessor; - // use the fine grained definition (eg the child if available). Its always possible to get back to the parent - Processor wrapped = strategy.wrapProcessorInInterceptors(route.getCamelContext(), targetOutputDef, target, next); - if (!(wrapped instanceof AsyncProcessor)) { - LOG.warn("Interceptor: {} at: {} does not return an AsyncProcessor instance." - + " This causes the asynchronous routing engine to not work as optimal as possible." - + " See more details at the InterceptStrategy javadoc." - + " Camel will use a bridge to adapt the interceptor to the asynchronous routing engine," - + " but its not the most optimal solution. Please consider changing your interceptor to comply.", - strategy, definition); - } - if (!(wrapped instanceof WrapAwareProcessor)) { - // wrap the target so it becomes a service and we can manage its lifecycle - wrapped = PluginHelper.getInternalProcessorFactory(camelContext) - .createWrapProcessor(wrapped, target); + boolean skip = target instanceof InterceptableProcessor && !((InterceptableProcessor) target).canIntercept(); + if (!skip) { + for (InterceptStrategy strategy : interceptors) { + Processor next = target == nextProcessor ? null : nextProcessor; + // use the fine grained definition (eg the child if available). Its always possible to get back to the parent + Processor wrapped + = strategy.wrapProcessorInInterceptors(route.getCamelContext(), targetOutputDef, target, next); + if (!(wrapped instanceof AsyncProcessor)) { + LOG.warn("Interceptor: {} at: {} does not return an AsyncProcessor instance." + + " This causes the asynchronous routing engine to not work as optimal as possible." + + " See more details at the InterceptStrategy javadoc." + + " Camel will use a bridge to adapt the interceptor to the asynchronous routing engine," + + " but its not the most optimal solution. Please consider changing your interceptor to comply.", + strategy, definition); + } + if (!(wrapped instanceof WrapAwareProcessor)) { + // wrap the target so it becomes a service and we can manage its lifecycle + wrapped = PluginHelper.getInternalProcessorFactory(camelContext) + .createWrapProcessor(wrapped, target); + } + target = wrapped; } - target = wrapped; } if (route.isStreamCaching()) { diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java index 53bd9b73214..dc814fd6f0f 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java @@ -31,6 +31,7 @@ import org.apache.camel.Processor; import org.apache.camel.RollbackExchangeException; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.InterceptableProcessor; import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.EventHelper; import org.apache.camel.support.ExchangeHelper; @@ -42,7 +43,7 @@ import org.slf4j.LoggerFactory; /** * A processor which catches exceptions. */ -public class CatchProcessor extends DelegateAsyncProcessor implements Traceable, IdAware, RouteIdAware { +public class CatchProcessor extends DelegateAsyncProcessor implements Traceable, IdAware, RouteIdAware, InterceptableProcessor { private static final Logger LOG = LoggerFactory.getLogger(CatchProcessor.class); @@ -110,6 +111,11 @@ public class CatchProcessor extends DelegateAsyncProcessor implements Traceable, return "catch"; } + @Override + public boolean canIntercept() { + return false; + } + @Override public boolean process(final Exchange exchange, final AsyncCallback callback) { final Exception e = exchange.getException(); diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/FinallyProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/FinallyProcessor.java index 8c8f0ab2da3..c8a2bf3d200 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/FinallyProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/FinallyProcessor.java @@ -22,6 +22,7 @@ import org.apache.camel.ExchangePropertyKey; import org.apache.camel.Processor; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.InterceptableProcessor; import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.processor.DelegateAsyncProcessor; @@ -31,7 +32,8 @@ import org.slf4j.LoggerFactory; /** * Processor to handle do finally supporting asynchronous routing engine */ -public class FinallyProcessor extends DelegateAsyncProcessor implements Traceable, IdAware, RouteIdAware { +public class FinallyProcessor extends DelegateAsyncProcessor + implements Traceable, IdAware, RouteIdAware, InterceptableProcessor { private static final Logger LOG = LoggerFactory.getLogger(FinallyProcessor.class); @@ -90,6 +92,11 @@ public class FinallyProcessor extends DelegateAsyncProcessor implements Traceabl this.routeId = routeId; } + @Override + public boolean canIntercept() { + return false; + } + private static final class FinallyAsyncCallback implements AsyncCallback { private final Exchange exchange; diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/TryProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/TryProcessor.java index 8db60f2cfcc..2cb4a81953a 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/TryProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/TryProcessor.java @@ -29,6 +29,7 @@ import org.apache.camel.Navigate; import org.apache.camel.Processor; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.InterceptableProcessor; import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorConverterHelper; @@ -41,7 +42,8 @@ import org.slf4j.LoggerFactory; /** * Implements try/catch/finally type processing */ -public class TryProcessor extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, IdAware, RouteIdAware { +public class TryProcessor extends AsyncProcessorSupport + implements Navigate<Processor>, Traceable, IdAware, RouteIdAware, InterceptableProcessor { private static final Logger LOG = LoggerFactory.getLogger(TryProcessor.class); @@ -72,6 +74,11 @@ public class TryProcessor extends AsyncProcessorSupport implements Navigate<Proc return "doTry"; } + @Override + public boolean canIntercept() { + return false; + } + @Override public boolean process(Exchange exchange, AsyncCallback callback) { reactiveExecutor.schedule(new TryState(exchange, callback)); diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/InterceptReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/InterceptReifier.java index 264551b07ad..363fc66f5a7 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/InterceptReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/InterceptReifier.java @@ -45,6 +45,7 @@ public class InterceptReifier<T extends InterceptDefinition> extends ProcessorRe public Processor wrapProcessorInInterceptors( CamelContext context, NamedNode definition, Processor target, Processor nextTarget) throws Exception { + // store the target we are intercepting this.interceptedTarget = target; diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptDoTryCatchTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptDoTryCatchTest.java new file mode 100644 index 00000000000..f02e9b9cbef --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptDoTryCatchTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.intercept; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Test; + +public class InterceptDoTryCatchTest extends ContextTestSupport { + + @Test + public void testIntercept() throws Exception { + getMockEndpoint("mock:foo").expectedMessageCount(1); + getMockEndpoint("mock:bar").expectedMessageCount(1); + getMockEndpoint("mock:result").expectedMessageCount(1); + + getMockEndpoint("mock:intercepted").expectedMessageCount(4); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + intercept().to("mock:intercepted"); + + from("direct:start") + .to("mock:foo") + .doTry() + .throwException(new IllegalArgumentException("Forced")) + .doCatch(Exception.class) + .to("mock:bar") + .end() + .to("mock:result"); + } + }; + } +}