This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch when in repository https://gitbox.apache.org/repos/asf/camel.git
commit 784602a7c47e4fa652398d841a4519dc5a350e3a Author: Claus Ibsen <[email protected]> AuthorDate: Wed Jan 15 15:54:30 2025 +0100 CAMEL-21620: camel-core - Fix onWhen to not include outputs in model --- .../InterceptSendToEndpointProcessor.java | 68 +++++++++++++--------- 1 file changed, 40 insertions(+), 28 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java index fc0495186a1..ec89fe63715 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java @@ -16,8 +16,6 @@ */ package org.apache.camel.processor; -import java.util.Arrays; - import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.AsyncProducer; @@ -26,9 +24,9 @@ import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePropertyKey; import org.apache.camel.Predicate; +import org.apache.camel.Processor; import org.apache.camel.spi.InterceptSendToEndpoint; import org.apache.camel.support.AsyncProcessorConverterHelper; -import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.DefaultAsyncProducer; import org.apache.camel.support.DefaultInterceptSendToEndpoint; import org.apache.camel.support.ExchangeHelper; @@ -52,6 +50,7 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer { private final boolean skip; private final Predicate onWhen; private AsyncProcessor pipeline; + private AsyncProcessor after; public InterceptSendToEndpointProcessor(InterceptSendToEndpoint endpoint, Endpoint delegate, AsyncProducer producer, boolean skip, Predicate onWhen) { @@ -77,12 +76,14 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer { } exchange.setProperty(ExchangePropertyKey.INTERCEPTED_ENDPOINT, delegate.getEndpointUri()); - if (pipeline != null) { - // detour the exchange with the pipeline that has before and after included - return pipeline.process(exchange, callback); - } + return pipeline.process(exchange, doneSync -> callback(exchange, callback, doneSync)); - return callback(exchange, callback, true); + // if (pipeline != null) { + // detour the exchange with the pipeline that has before and after included + // return pipeline.process(exchange, callback); + // } + // + // return callback(exchange, callback, true); } private boolean callback(Exchange exchange, AsyncCallback callback, boolean doneSync) { @@ -103,7 +104,7 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer { whenMatches = (Boolean) exchange.getProperty(ExchangePropertyKey.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED); } else { // remove property as it's no longer needed - whenMatches = (Boolean) exchange.removeProperty(ExchangePropertyKey.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED); + whenMatches = (Boolean) exchange.getProperty(ExchangePropertyKey.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED); } if (whenMatches != null) { shouldSkip = skip && whenMatches; @@ -112,10 +113,27 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer { if (!shouldSkip) { ExchangeHelper.prepareOutToIn(exchange); + AsyncCallback ac = new AsyncCallback() { + @Override + public void done(boolean doneSync) { + try { + if (whenMatches == null || whenMatches) { + if (endpoint.getAfter() != null) { + endpoint.getAfter().process(exchange); + } + } + ExchangeHelper.prepareOutToIn(exchange); + } catch (Exception e) { + exchange.setException(e); + } finally { + exchange.removeProperty(ExchangePropertyKey.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED); + callback.done(doneSync); + } + } + }; + // route to original destination leveraging the asynchronous routing engine if possible - boolean s = producer.process(exchange, ds -> { - callback.done(doneSync && ds); - }); + boolean s = producer.process(exchange, ac); return doneSync && s; } else { if (LOG.isDebugEnabled()) { @@ -136,7 +154,7 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer { protected void doBuild() throws Exception { CamelContextAware.trySetCamelContext(producer, endpoint.getCamelContext()); // build pipeline with before/after processors - if (endpoint.getBefore() != null || endpoint.getAfter() != null) { + /* if (endpoint.getBefore() != null || endpoint.getAfter() != null) { // detour the exchange using synchronous processing AsyncProcessor before = null; if (endpoint.getBefore() != null) { @@ -151,33 +169,27 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer { // only execute the after if the intercept when predicate matches final FilterProcessor filter = createFilterProcessor(); pipeline = new Pipeline(getEndpoint().getCamelContext(), Arrays.asList(before, ascb, filter)); - } - - ServiceHelper.buildService(producer, pipeline); - } + }*/ - private FilterProcessor createFilterProcessor() { - Predicate predicate = exchange -> { - onWhen.matches(exchange); - Boolean whenMatches - = (Boolean) exchange.removeProperty(ExchangePropertyKey.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED); - return whenMatches == null || whenMatches; - }; - AsyncProcessor after = null; + pipeline = createFilterProcessor(); if (endpoint.getAfter() != null) { after = AsyncProcessorConverterHelper.convert(endpoint.getAfter()); } - return new FilterProcessor(getEndpoint().getCamelContext(), predicate, after); + ServiceHelper.buildService(producer, pipeline, after); + } + + private FilterProcessor createFilterProcessor() { + return new FilterProcessor(getEndpoint().getCamelContext(), onWhen, endpoint.getBefore()); } @Override protected void doInit() throws Exception { - ServiceHelper.initService(producer, pipeline); + ServiceHelper.initService(producer, pipeline, after); } @Override protected void doStart() throws Exception { - ServiceHelper.startService(producer, pipeline); + ServiceHelper.startService(producer, pipeline, after); } @Override
