This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch when
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 784602a7c47e4fa652398d841a4519dc5a350e3a
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Jan 15 15:54:30 2025 +0100

    CAMEL-21620: camel-core - Fix onWhen to not include outputs in model
---
 .../InterceptSendToEndpointProcessor.java          | 68 +++++++++++++---------
 1 file changed, 40 insertions(+), 28 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java
index fc0495186a1..ec89fe63715 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.processor;
 
-import java.util.Arrays;
-
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.AsyncProducer;
@@ -26,9 +24,9 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.Predicate;
+import org.apache.camel.Processor;
 import org.apache.camel.spi.InterceptSendToEndpoint;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
-import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.support.DefaultInterceptSendToEndpoint;
 import org.apache.camel.support.ExchangeHelper;
@@ -52,6 +50,7 @@ public class InterceptSendToEndpointProcessor extends 
DefaultAsyncProducer {
     private final boolean skip;
     private final Predicate onWhen;
     private AsyncProcessor pipeline;
+    private AsyncProcessor after;
 
     public InterceptSendToEndpointProcessor(InterceptSendToEndpoint endpoint, 
Endpoint delegate, AsyncProducer producer,
                                             boolean skip, Predicate onWhen) {
@@ -77,12 +76,14 @@ public class InterceptSendToEndpointProcessor extends 
DefaultAsyncProducer {
         }
         exchange.setProperty(ExchangePropertyKey.INTERCEPTED_ENDPOINT, 
delegate.getEndpointUri());
 
-        if (pipeline != null) {
-            // detour the exchange with the pipeline that has before and after 
included
-            return pipeline.process(exchange, callback);
-        }
+        return pipeline.process(exchange, doneSync -> callback(exchange, 
callback, doneSync));
 
-        return callback(exchange, callback, true);
+        //        if (pipeline != null) {
+        //             detour the exchange with the pipeline that has before 
and after included
+        //            return pipeline.process(exchange, callback);
+        //        }
+        //
+        //        return callback(exchange, callback, true);
     }
 
     private boolean callback(Exchange exchange, AsyncCallback callback, 
boolean doneSync) {
@@ -103,7 +104,7 @@ public class InterceptSendToEndpointProcessor extends 
DefaultAsyncProducer {
             whenMatches = (Boolean) 
exchange.getProperty(ExchangePropertyKey.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED);
         } else {
             // remove property as it's no longer needed
-            whenMatches = (Boolean) 
exchange.removeProperty(ExchangePropertyKey.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED);
+            whenMatches = (Boolean) 
exchange.getProperty(ExchangePropertyKey.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED);
         }
         if (whenMatches != null) {
             shouldSkip = skip && whenMatches;
@@ -112,10 +113,27 @@ public class InterceptSendToEndpointProcessor extends 
DefaultAsyncProducer {
         if (!shouldSkip) {
             ExchangeHelper.prepareOutToIn(exchange);
 
+            AsyncCallback ac = new AsyncCallback() {
+                @Override
+                public void done(boolean doneSync) {
+                    try {
+                        if (whenMatches == null || whenMatches) {
+                            if (endpoint.getAfter() != null) {
+                                endpoint.getAfter().process(exchange);
+                            }
+                        }
+                        ExchangeHelper.prepareOutToIn(exchange);
+                    } catch (Exception e) {
+                        exchange.setException(e);
+                    } finally {
+                        
exchange.removeProperty(ExchangePropertyKey.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED);
+                        callback.done(doneSync);
+                    }
+                }
+            };
+
             // route to original destination leveraging the asynchronous 
routing engine if possible
-            boolean s = producer.process(exchange, ds -> {
-                callback.done(doneSync && ds);
-            });
+            boolean s = producer.process(exchange, ac);
             return doneSync && s;
         } else {
             if (LOG.isDebugEnabled()) {
@@ -136,7 +154,7 @@ public class InterceptSendToEndpointProcessor extends 
DefaultAsyncProducer {
     protected void doBuild() throws Exception {
         CamelContextAware.trySetCamelContext(producer, 
endpoint.getCamelContext());
         // build pipeline with before/after processors
-        if (endpoint.getBefore() != null || endpoint.getAfter() != null) {
+        /*        if (endpoint.getBefore() != null || endpoint.getAfter() != 
null) {
             // detour the exchange using synchronous processing
             AsyncProcessor before = null;
             if (endpoint.getBefore() != null) {
@@ -151,33 +169,27 @@ public class InterceptSendToEndpointProcessor extends 
DefaultAsyncProducer {
             // only execute the after if the intercept when predicate matches
             final FilterProcessor filter = createFilterProcessor();
             pipeline = new Pipeline(getEndpoint().getCamelContext(), 
Arrays.asList(before, ascb, filter));
-        }
-
-        ServiceHelper.buildService(producer, pipeline);
-    }
+        }*/
 
-    private FilterProcessor createFilterProcessor() {
-        Predicate predicate = exchange -> {
-            onWhen.matches(exchange);
-            Boolean whenMatches
-                    = (Boolean) 
exchange.removeProperty(ExchangePropertyKey.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED);
-            return whenMatches == null || whenMatches;
-        };
-        AsyncProcessor after = null;
+        pipeline = createFilterProcessor();
         if (endpoint.getAfter() != null) {
             after = AsyncProcessorConverterHelper.convert(endpoint.getAfter());
         }
-        return new FilterProcessor(getEndpoint().getCamelContext(), predicate, 
after);
+        ServiceHelper.buildService(producer, pipeline, after);
+    }
+
+    private FilterProcessor createFilterProcessor() {
+        return new FilterProcessor(getEndpoint().getCamelContext(), onWhen, 
endpoint.getBefore());
     }
 
     @Override
     protected void doInit() throws Exception {
-        ServiceHelper.initService(producer, pipeline);
+        ServiceHelper.initService(producer, pipeline, after);
     }
 
     @Override
     protected void doStart() throws Exception {
-        ServiceHelper.startService(producer, pipeline);
+        ServiceHelper.startService(producer, pipeline, after);
     }
 
     @Override

Reply via email to