This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.7.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.7.x by this push:
     new 8c5c40b  CAMEL-16629: camel-core - InterceptSendToEndpoint - AfterUri 
should only trigger if when was true
8c5c40b is described below

commit 8c5c40b0f40aa9d7dcfe6cd161517b8af8026f4e
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed May 19 07:33:24 2021 +0200

    CAMEL-16629: camel-core - InterceptSendToEndpoint - AfterUri should only 
trigger if when was true
---
 .../InterceptSendToEndpointProcessor.java          | 78 +++++++++++++++-------
 .../InterceptSendToEndpointAfterTest.java          | 23 +++++++
 2 files changed, 77 insertions(+), 24 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java
index 6a8187b..834a713 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java
@@ -21,8 +21,10 @@ import java.util.Arrays;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.AsyncProducer;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
 import org.apache.camel.spi.InterceptSendToEndpoint;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorSupport;
@@ -46,6 +48,7 @@ public class InterceptSendToEndpointProcessor extends 
DefaultAsyncProducer {
     private final Endpoint delegate;
     private final AsyncProducer producer;
     private final boolean skip;
+    private AsyncProcessor pipeline;
 
     public InterceptSendToEndpointProcessor(InterceptSendToEndpoint endpoint, 
Endpoint delegate, AsyncProducer producer,
                                             boolean skip) {
@@ -71,24 +74,9 @@ public class InterceptSendToEndpointProcessor extends 
DefaultAsyncProducer {
         // add header with the real endpoint uri
         exchange.getIn().setHeader(Exchange.INTERCEPTED_ENDPOINT, 
delegate.getEndpointUri());
 
-        if (endpoint.getBefore() != null || endpoint.getAfter() != null) {
-            // detour the exchange using synchronous processing
-            AsyncProcessor before = null;
-            if (endpoint.getBefore() != null) {
-                before = 
AsyncProcessorConverterHelper.convert(endpoint.getBefore());
-            }
-            AsyncProcessor ascb = new AsyncProcessorSupport() {
-                @Override
-                public boolean process(Exchange exchange, AsyncCallback 
callback) {
-                    return callback(exchange, callback, true);
-                }
-            };
-            AsyncProcessor after = null;
-            if (endpoint.getAfter() != null) {
-                after = 
AsyncProcessorConverterHelper.convert(endpoint.getAfter());
-            }
-
-            return new Pipeline(exchange.getContext(), Arrays.asList(before, 
ascb, after)).process(exchange, callback);
+        if (pipeline != null) {
+            // detour the exchange with the pipeline that has before and after 
included
+            return pipeline.process(exchange, callback);
         }
 
         return callback(exchange, callback, true);
@@ -106,7 +94,14 @@ public class InterceptSendToEndpointProcessor extends 
DefaultAsyncProducer {
         boolean shouldSkip = skip;
 
         // if then interceptor had a when predicate, then we should only skip 
if it matched
-        Boolean whenMatches = (Boolean) 
exchange.removeProperty(Exchange.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED);
+        Boolean whenMatches;
+        if (endpoint.getAfter() != null) {
+            // only get the property as after also needs to check this property
+            whenMatches = (Boolean) 
exchange.getProperty(Exchange.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED);
+        } else {
+            // remove property as its not longer needed
+            whenMatches = (Boolean) 
exchange.removeProperty(Exchange.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED);
+        }
         if (whenMatches != null) {
             shouldSkip = skip && whenMatches;
         }
@@ -139,17 +134,52 @@ public class InterceptSendToEndpointProcessor extends 
DefaultAsyncProducer {
     }
 
     @Override
-    public void start() {
-        ServiceHelper.startService(endpoint.getBefore(), endpoint.getAfter());
-        // here we also need to start the producer
-        ServiceHelper.startService(producer);
+    protected void doInit() throws Exception {
+        CamelContextAware.trySetCamelContext(producer, 
endpoint.getCamelContext());
+        // build pipeline with before/after processors
+        if (endpoint.getBefore() != null || endpoint.getAfter() != null) {
+            // detour the exchange using synchronous processing
+            AsyncProcessor before = null;
+            if (endpoint.getBefore() != null) {
+                before = 
AsyncProcessorConverterHelper.convert(endpoint.getBefore());
+            }
+            AsyncProcessor ascb = new AsyncProcessorSupport() {
+                @Override
+                public boolean process(Exchange exchange, AsyncCallback 
callback) {
+                    return callback(exchange, callback, true);
+                }
+            };
+            // only execute the after if the intercept when predicate matches
+            Predicate predicate = exchange -> {
+                Boolean whenMatches
+                        = (Boolean) 
exchange.removeProperty(Exchange.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED);
+                return whenMatches == null || whenMatches;
+            };
+            AsyncProcessor after = null;
+            if (endpoint.getAfter() != null) {
+                after = 
AsyncProcessorConverterHelper.convert(endpoint.getAfter());
+            }
+            FilterProcessor filter = new 
FilterProcessor(getEndpoint().getCamelContext(), predicate, after);
+            pipeline = new Pipeline(getEndpoint().getCamelContext(), 
Arrays.asList(before, ascb, filter));
+        }
+
+        ServiceHelper.initService(producer, pipeline);
     }
 
     @Override
-    public void stop() {
+    protected void doStart() throws Exception {
+        ServiceHelper.startService(producer, pipeline);
+    }
+
+    @Override
+    public void doStop() {
         // do not stop before/after as it should only be stopped when the 
interceptor stops
         // we should stop the producer here
         ServiceHelper.stopService(producer);
     }
 
+    @Override
+    protected void doShutdown() throws Exception {
+        ServiceHelper.stopAndShutdownServices(producer, pipeline);
+    }
 }
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptSendToEndpointAfterTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptSendToEndpointAfterTest.java
index 32630a8..4a5852c 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptSendToEndpointAfterTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptSendToEndpointAfterTest.java
@@ -100,4 +100,27 @@ public class InterceptSendToEndpointAfterTest extends 
ContextTestSupport {
         assertMockEndpointsSatisfied();
     }
 
+    @Test
+    public void testInterceptEndpointWhen() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                interceptSendToEndpoint("direct:start").when(simple("${body} 
contains 'World'")).to("mock:detour")
+                        .afterUrl("mock:after");
+
+                from("direct:start").to("mock:foo").transform().constant("Bye 
World");
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:detour").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World", "Hi 
Camel");
+        getMockEndpoint("mock:after").expectedBodiesReceived("Bye World");
+
+        template.sendBody("direct:start", "Hello World");
+        template.sendBody("direct:start", "Hi Camel");
+
+        assertMockEndpointsSatisfied();
+    }
+
 }

Reply via email to