This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-4.4.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-4.4.x by this push:
     new 3554f177d14 CAMEL-20778: intercept EIP should not intercept 
doTry/doCatch/doFinally. (#14180)
3554f177d14 is described below

commit 3554f177d14bdcd7a65a77a60b7c5344b2ef96e2
Author: Denis Istomin <istomin....@gmail.com>
AuthorDate: Sat May 18 11:12:24 2024 +0500

    CAMEL-20778: intercept EIP should not intercept doTry/doCatch/doFinally. 
(#14180)
    
    Co-authored-by: Claus Ibsen <claus.ib...@gmail.com>
---
 .../apache/camel/spi/InterceptableProcessor.java   | 35 ++++++++++++++
 .../apache/camel/impl/engine/DefaultChannel.java   | 39 ++++++++-------
 .../org/apache/camel/processor/CatchProcessor.java |  8 +++-
 .../apache/camel/processor/FinallyProcessor.java   |  9 +++-
 .../org/apache/camel/processor/TryProcessor.java   |  9 +++-
 .../org/apache/camel/reifier/InterceptReifier.java |  1 +
 .../intercept/InterceptDoTryCatchTest.java         | 56 ++++++++++++++++++++++
 7 files changed, 137 insertions(+), 20 deletions(-)

diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/InterceptableProcessor.java 
b/core/camel-api/src/main/java/org/apache/camel/spi/InterceptableProcessor.java
new file mode 100644
index 00000000000..a3463272a10
--- /dev/null
+++ 
b/core/camel-api/src/main/java/org/apache/camel/spi/InterceptableProcessor.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.Processor;
+
+/**
+ * To control whether a {@link Processor} can be intercepted via {@link 
InterceptStrategy}.
+ *
+ * Some EIPs such as try/catch/finally cannot be intercepted.
+ */
+public interface InterceptableProcessor {
+
+    /**
+     * Whether the processor can be intercepted or not.
+     *
+     * @return true to allow intercepting, false to skip.
+     */
+    boolean canIntercept();
+
+}
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
index 07539ebedc0..61ba5bc3c03 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
@@ -36,6 +36,7 @@ import org.apache.camel.spi.BacklogDebugger;
 import org.apache.camel.spi.Debugger;
 import org.apache.camel.spi.ErrorHandlerRedeliveryCustomizer;
 import org.apache.camel.spi.InterceptStrategy;
+import org.apache.camel.spi.InterceptableProcessor;
 import org.apache.camel.spi.ManagementInterceptStrategy;
 import org.apache.camel.spi.MessageHistoryFactory;
 import org.apache.camel.spi.Tracer;
@@ -228,24 +229,28 @@ public class DefaultChannel extends 
CamelInternalProcessor implements Channel {
         Collections.reverse(interceptors);
         // wrap the output with the configured interceptors
         Processor target = nextProcessor;
-        for (InterceptStrategy strategy : interceptors) {
-            Processor next = target == nextProcessor ? null : nextProcessor;
-            // use the fine grained definition (eg the child if available). 
Its always possible to get back to the parent
-            Processor wrapped = 
strategy.wrapProcessorInInterceptors(route.getCamelContext(), targetOutputDef, 
target, next);
-            if (!(wrapped instanceof AsyncProcessor)) {
-                LOG.warn("Interceptor: {} at: {} does not return an 
AsyncProcessor instance."
-                         + " This causes the asynchronous routing engine to 
not work as optimal as possible."
-                         + " See more details at the InterceptStrategy 
javadoc."
-                         + " Camel will use a bridge to adapt the interceptor 
to the asynchronous routing engine,"
-                         + " but its not the most optimal solution. Please 
consider changing your interceptor to comply.",
-                        strategy, definition);
-            }
-            if (!(wrapped instanceof WrapAwareProcessor)) {
-                // wrap the target so it becomes a service and we can manage 
its lifecycle
-                wrapped = 
PluginHelper.getInternalProcessorFactory(camelContext)
-                        .createWrapProcessor(wrapped, target);
+        boolean skip = target instanceof InterceptableProcessor && 
!((InterceptableProcessor) target).canIntercept();
+        if (!skip) {
+            for (InterceptStrategy strategy : interceptors) {
+                Processor next = target == nextProcessor ? null : 
nextProcessor;
+                // use the fine grained definition (eg the child if 
available). Its always possible to get back to the parent
+                Processor wrapped
+                        = 
strategy.wrapProcessorInInterceptors(route.getCamelContext(), targetOutputDef, 
target, next);
+                if (!(wrapped instanceof AsyncProcessor)) {
+                    LOG.warn("Interceptor: {} at: {} does not return an 
AsyncProcessor instance."
+                             + " This causes the asynchronous routing engine 
to not work as optimal as possible."
+                             + " See more details at the InterceptStrategy 
javadoc."
+                             + " Camel will use a bridge to adapt the 
interceptor to the asynchronous routing engine,"
+                             + " but its not the most optimal solution. Please 
consider changing your interceptor to comply.",
+                            strategy, definition);
+                }
+                if (!(wrapped instanceof WrapAwareProcessor)) {
+                    // wrap the target so it becomes a service and we can 
manage its lifecycle
+                    wrapped = 
PluginHelper.getInternalProcessorFactory(camelContext)
+                            .createWrapProcessor(wrapped, target);
+                }
+                target = wrapped;
             }
-            target = wrapped;
         }
 
         if (route.isStreamCaching()) {
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java
index 53bd9b73214..dc814fd6f0f 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java
@@ -31,6 +31,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.RollbackExchangeException;
 import org.apache.camel.Traceable;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.InterceptableProcessor;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.support.EventHelper;
 import org.apache.camel.support.ExchangeHelper;
@@ -42,7 +43,7 @@ import org.slf4j.LoggerFactory;
 /**
  * A processor which catches exceptions.
  */
-public class CatchProcessor extends DelegateAsyncProcessor implements 
Traceable, IdAware, RouteIdAware {
+public class CatchProcessor extends DelegateAsyncProcessor implements 
Traceable, IdAware, RouteIdAware, InterceptableProcessor {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(CatchProcessor.class);
 
@@ -110,6 +111,11 @@ public class CatchProcessor extends DelegateAsyncProcessor 
implements Traceable,
         return "catch";
     }
 
+    @Override
+    public boolean canIntercept() {
+        return false;
+    }
+
     @Override
     public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
         final Exception e = exchange.getException();
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/FinallyProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/FinallyProcessor.java
index 8c8f0ab2da3..c8a2bf3d200 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/FinallyProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/FinallyProcessor.java
@@ -22,6 +22,7 @@ import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.Processor;
 import org.apache.camel.Traceable;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.InterceptableProcessor;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.processor.DelegateAsyncProcessor;
@@ -31,7 +32,8 @@ import org.slf4j.LoggerFactory;
 /**
  * Processor to handle do finally supporting asynchronous routing engine
  */
-public class FinallyProcessor extends DelegateAsyncProcessor implements 
Traceable, IdAware, RouteIdAware {
+public class FinallyProcessor extends DelegateAsyncProcessor
+        implements Traceable, IdAware, RouteIdAware, InterceptableProcessor {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FinallyProcessor.class);
 
@@ -90,6 +92,11 @@ public class FinallyProcessor extends DelegateAsyncProcessor 
implements Traceabl
         this.routeId = routeId;
     }
 
+    @Override
+    public boolean canIntercept() {
+        return false;
+    }
+
     private static final class FinallyAsyncCallback implements AsyncCallback {
 
         private final Exchange exchange;
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/TryProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/TryProcessor.java
index 8db60f2cfcc..2cb4a81953a 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/TryProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/TryProcessor.java
@@ -29,6 +29,7 @@ import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
 import org.apache.camel.Traceable;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.InterceptableProcessor;
 import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
@@ -41,7 +42,8 @@ import org.slf4j.LoggerFactory;
 /**
  * Implements try/catch/finally type processing
  */
-public class TryProcessor extends AsyncProcessorSupport implements 
Navigate<Processor>, Traceable, IdAware, RouteIdAware {
+public class TryProcessor extends AsyncProcessorSupport
+        implements Navigate<Processor>, Traceable, IdAware, RouteIdAware, 
InterceptableProcessor {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TryProcessor.class);
 
@@ -72,6 +74,11 @@ public class TryProcessor extends AsyncProcessorSupport 
implements Navigate<Proc
         return "doTry";
     }
 
+    @Override
+    public boolean canIntercept() {
+        return false;
+    }
+
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         reactiveExecutor.schedule(new TryState(exchange, callback));
diff --git 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/InterceptReifier.java
 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/InterceptReifier.java
index 264551b07ad..363fc66f5a7 100644
--- 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/InterceptReifier.java
+++ 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/InterceptReifier.java
@@ -45,6 +45,7 @@ public class InterceptReifier<T extends InterceptDefinition> 
extends ProcessorRe
             public Processor wrapProcessorInInterceptors(
                     CamelContext context, NamedNode definition, Processor 
target, Processor nextTarget)
                     throws Exception {
+
                 // store the target we are intercepting
                 this.interceptedTarget = target;
 
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptDoTryCatchTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptDoTryCatchTest.java
new file mode 100644
index 00000000000..f02e9b9cbef
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptDoTryCatchTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.intercept;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Test;
+
+public class InterceptDoTryCatchTest extends ContextTestSupport {
+
+    @Test
+    public void testIntercept() throws Exception {
+        getMockEndpoint("mock:foo").expectedMessageCount(1);
+        getMockEndpoint("mock:bar").expectedMessageCount(1);
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        getMockEndpoint("mock:intercepted").expectedMessageCount(4);
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                intercept().to("mock:intercepted");
+
+                from("direct:start")
+                    .to("mock:foo")
+                    .doTry()
+                        .throwException(new IllegalArgumentException("Forced"))
+                    .doCatch(Exception.class)
+                        .to("mock:bar")
+                    .end()
+                    .to("mock:result");
+            }
+        };
+    }
+}

Reply via email to