This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-4.4.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.4.x by this push:
new 3554f177d14 CAMEL-20778: intercept EIP should not intercept
doTry/doCatch/doFinally. (#14180)
3554f177d14 is described below
commit 3554f177d14bdcd7a65a77a60b7c5344b2ef96e2
Author: Denis Istomin <[email protected]>
AuthorDate: Sat May 18 11:12:24 2024 +0500
CAMEL-20778: intercept EIP should not intercept doTry/doCatch/doFinally.
(#14180)
Co-authored-by: Claus Ibsen <[email protected]>
---
.../apache/camel/spi/InterceptableProcessor.java | 35 ++++++++++++++
.../apache/camel/impl/engine/DefaultChannel.java | 39 ++++++++-------
.../org/apache/camel/processor/CatchProcessor.java | 8 +++-
.../apache/camel/processor/FinallyProcessor.java | 9 +++-
.../org/apache/camel/processor/TryProcessor.java | 9 +++-
.../org/apache/camel/reifier/InterceptReifier.java | 1 +
.../intercept/InterceptDoTryCatchTest.java | 56 ++++++++++++++++++++++
7 files changed, 137 insertions(+), 20 deletions(-)
diff --git
a/core/camel-api/src/main/java/org/apache/camel/spi/InterceptableProcessor.java
b/core/camel-api/src/main/java/org/apache/camel/spi/InterceptableProcessor.java
new file mode 100644
index 00000000000..a3463272a10
--- /dev/null
+++
b/core/camel-api/src/main/java/org/apache/camel/spi/InterceptableProcessor.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.Processor;
+
+/**
+ * To control whether a {@link Processor} can be intercepted via {@link
InterceptStrategy}.
+ *
+ * Some EIPs such as try/catch/finally cannot be intercepted.
+ */
+public interface InterceptableProcessor {
+
+ /**
+ * Whether the processor can be intercepted or not.
+ *
+ * @return true to allow intercepting, false to skip.
+ */
+ boolean canIntercept();
+
+}
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
index 07539ebedc0..61ba5bc3c03 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
@@ -36,6 +36,7 @@ import org.apache.camel.spi.BacklogDebugger;
import org.apache.camel.spi.Debugger;
import org.apache.camel.spi.ErrorHandlerRedeliveryCustomizer;
import org.apache.camel.spi.InterceptStrategy;
+import org.apache.camel.spi.InterceptableProcessor;
import org.apache.camel.spi.ManagementInterceptStrategy;
import org.apache.camel.spi.MessageHistoryFactory;
import org.apache.camel.spi.Tracer;
@@ -228,24 +229,28 @@ public class DefaultChannel extends
CamelInternalProcessor implements Channel {
Collections.reverse(interceptors);
// wrap the output with the configured interceptors
Processor target = nextProcessor;
- for (InterceptStrategy strategy : interceptors) {
- Processor next = target == nextProcessor ? null : nextProcessor;
- // use the fine grained definition (eg the child if available).
Its always possible to get back to the parent
- Processor wrapped =
strategy.wrapProcessorInInterceptors(route.getCamelContext(), targetOutputDef,
target, next);
- if (!(wrapped instanceof AsyncProcessor)) {
- LOG.warn("Interceptor: {} at: {} does not return an
AsyncProcessor instance."
- + " This causes the asynchronous routing engine to
not work as optimal as possible."
- + " See more details at the InterceptStrategy
javadoc."
- + " Camel will use a bridge to adapt the interceptor
to the asynchronous routing engine,"
- + " but its not the most optimal solution. Please
consider changing your interceptor to comply.",
- strategy, definition);
- }
- if (!(wrapped instanceof WrapAwareProcessor)) {
- // wrap the target so it becomes a service and we can manage
its lifecycle
- wrapped =
PluginHelper.getInternalProcessorFactory(camelContext)
- .createWrapProcessor(wrapped, target);
+ boolean skip = target instanceof InterceptableProcessor &&
!((InterceptableProcessor) target).canIntercept();
+ if (!skip) {
+ for (InterceptStrategy strategy : interceptors) {
+ Processor next = target == nextProcessor ? null :
nextProcessor;
+ // use the fine grained definition (eg the child if
available). Its always possible to get back to the parent
+ Processor wrapped
+ =
strategy.wrapProcessorInInterceptors(route.getCamelContext(), targetOutputDef,
target, next);
+ if (!(wrapped instanceof AsyncProcessor)) {
+ LOG.warn("Interceptor: {} at: {} does not return an
AsyncProcessor instance."
+ + " This causes the asynchronous routing engine
to not work as optimal as possible."
+ + " See more details at the InterceptStrategy
javadoc."
+ + " Camel will use a bridge to adapt the
interceptor to the asynchronous routing engine,"
+ + " but its not the most optimal solution. Please
consider changing your interceptor to comply.",
+ strategy, definition);
+ }
+ if (!(wrapped instanceof WrapAwareProcessor)) {
+ // wrap the target so it becomes a service and we can
manage its lifecycle
+ wrapped =
PluginHelper.getInternalProcessorFactory(camelContext)
+ .createWrapProcessor(wrapped, target);
+ }
+ target = wrapped;
}
- target = wrapped;
}
if (route.isStreamCaching()) {
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java
index 53bd9b73214..dc814fd6f0f 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java
@@ -31,6 +31,7 @@ import org.apache.camel.Processor;
import org.apache.camel.RollbackExchangeException;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.InterceptableProcessor;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.EventHelper;
import org.apache.camel.support.ExchangeHelper;
@@ -42,7 +43,7 @@ import org.slf4j.LoggerFactory;
/**
* A processor which catches exceptions.
*/
-public class CatchProcessor extends DelegateAsyncProcessor implements
Traceable, IdAware, RouteIdAware {
+public class CatchProcessor extends DelegateAsyncProcessor implements
Traceable, IdAware, RouteIdAware, InterceptableProcessor {
private static final Logger LOG =
LoggerFactory.getLogger(CatchProcessor.class);
@@ -110,6 +111,11 @@ public class CatchProcessor extends DelegateAsyncProcessor
implements Traceable,
return "catch";
}
+ @Override
+ public boolean canIntercept() {
+ return false;
+ }
+
@Override
public boolean process(final Exchange exchange, final AsyncCallback
callback) {
final Exception e = exchange.getException();
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/FinallyProcessor.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/FinallyProcessor.java
index 8c8f0ab2da3..c8a2bf3d200 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/FinallyProcessor.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/FinallyProcessor.java
@@ -22,6 +22,7 @@ import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.Processor;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.InterceptableProcessor;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.processor.DelegateAsyncProcessor;
@@ -31,7 +32,8 @@ import org.slf4j.LoggerFactory;
/**
* Processor to handle do finally supporting asynchronous routing engine
*/
-public class FinallyProcessor extends DelegateAsyncProcessor implements
Traceable, IdAware, RouteIdAware {
+public class FinallyProcessor extends DelegateAsyncProcessor
+ implements Traceable, IdAware, RouteIdAware, InterceptableProcessor {
private static final Logger LOG =
LoggerFactory.getLogger(FinallyProcessor.class);
@@ -90,6 +92,11 @@ public class FinallyProcessor extends DelegateAsyncProcessor
implements Traceabl
this.routeId = routeId;
}
+ @Override
+ public boolean canIntercept() {
+ return false;
+ }
+
private static final class FinallyAsyncCallback implements AsyncCallback {
private final Exchange exchange;
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/TryProcessor.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/TryProcessor.java
index 8db60f2cfcc..2cb4a81953a 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/TryProcessor.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/TryProcessor.java
@@ -29,6 +29,7 @@ import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.InterceptableProcessor;
import org.apache.camel.spi.ReactiveExecutor;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorConverterHelper;
@@ -41,7 +42,8 @@ import org.slf4j.LoggerFactory;
/**
* Implements try/catch/finally type processing
*/
-public class TryProcessor extends AsyncProcessorSupport implements
Navigate<Processor>, Traceable, IdAware, RouteIdAware {
+public class TryProcessor extends AsyncProcessorSupport
+ implements Navigate<Processor>, Traceable, IdAware, RouteIdAware,
InterceptableProcessor {
private static final Logger LOG =
LoggerFactory.getLogger(TryProcessor.class);
@@ -72,6 +74,11 @@ public class TryProcessor extends AsyncProcessorSupport
implements Navigate<Proc
return "doTry";
}
+ @Override
+ public boolean canIntercept() {
+ return false;
+ }
+
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
reactiveExecutor.schedule(new TryState(exchange, callback));
diff --git
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/InterceptReifier.java
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/InterceptReifier.java
index 264551b07ad..363fc66f5a7 100644
---
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/InterceptReifier.java
+++
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/InterceptReifier.java
@@ -45,6 +45,7 @@ public class InterceptReifier<T extends InterceptDefinition>
extends ProcessorRe
public Processor wrapProcessorInInterceptors(
CamelContext context, NamedNode definition, Processor
target, Processor nextTarget)
throws Exception {
+
// store the target we are intercepting
this.interceptedTarget = target;
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptDoTryCatchTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptDoTryCatchTest.java
new file mode 100644
index 00000000000..f02e9b9cbef
--- /dev/null
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptDoTryCatchTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.intercept;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Test;
+
+public class InterceptDoTryCatchTest extends ContextTestSupport {
+
+ @Test
+ public void testIntercept() throws Exception {
+ getMockEndpoint("mock:foo").expectedMessageCount(1);
+ getMockEndpoint("mock:bar").expectedMessageCount(1);
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+
+ getMockEndpoint("mock:intercepted").expectedMessageCount(4);
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ intercept().to("mock:intercepted");
+
+ from("direct:start")
+ .to("mock:foo")
+ .doTry()
+ .throwException(new IllegalArgumentException("Forced"))
+ .doCatch(Exception.class)
+ .to("mock:bar")
+ .end()
+ .to("mock:result");
+ }
+ };
+ }
+}