This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch when in repository https://gitbox.apache.org/repos/asf/camel.git
commit 8453bdc28fe8a91beff51621a160ca5964b69a0a Author: Claus Ibsen <[email protected]> AuthorDate: Wed Jan 15 16:05:45 2025 +0100 CAMEL-21620: camel-core - Fix onWhen to not include outputs in model --- .../InterceptSendToEndpointProcessor.java | 69 ++++------------------ 1 file changed, 12 insertions(+), 57 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java index ec89fe63715..ee91f12a9fc 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java @@ -75,15 +75,7 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer { endpoint.getBefore(), exchange); } exchange.setProperty(ExchangePropertyKey.INTERCEPTED_ENDPOINT, delegate.getEndpointUri()); - return pipeline.process(exchange, doneSync -> callback(exchange, callback, doneSync)); - - // if (pipeline != null) { - // detour the exchange with the pipeline that has before and after included - // return pipeline.process(exchange, callback); - // } - // - // return callback(exchange, callback, true); } private boolean callback(Exchange exchange, AsyncCallback callback, boolean doneSync) { @@ -98,14 +90,7 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer { boolean shouldSkip = skip; // if then interceptor has predicate, then we should only skip if matched - Boolean whenMatches; - if (endpoint.getAfter() != null) { - // only get the property as after also needs to check this property - whenMatches = (Boolean) exchange.getProperty(ExchangePropertyKey.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED); - } else { - // remove property as it's no longer needed - whenMatches = (Boolean) exchange.getProperty(ExchangePropertyKey.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED); - } + Boolean whenMatches = (Boolean) exchange.getProperty(ExchangePropertyKey.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED); if (whenMatches != null) { shouldSkip = skip && whenMatches; } @@ -113,27 +98,18 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer { if (!shouldSkip) { ExchangeHelper.prepareOutToIn(exchange); - AsyncCallback ac = new AsyncCallback() { - @Override - public void done(boolean doneSync) { - try { - if (whenMatches == null || whenMatches) { - if (endpoint.getAfter() != null) { - endpoint.getAfter().process(exchange); - } - } - ExchangeHelper.prepareOutToIn(exchange); - } catch (Exception e) { - exchange.setException(e); - } finally { - exchange.removeProperty(ExchangePropertyKey.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED); - callback.done(doneSync); - } - } + AsyncCallback ac1 = doneSync1 -> { + exchange.removeProperty(ExchangePropertyKey.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED); + callback.done(doneSync1); }; + AsyncCallback ac2 = null; + if (after != null && (whenMatches == null || whenMatches)) { + ac2 = doneSync2 -> after.process(exchange, ac1); + } - // route to original destination leveraging the asynchronous routing engine if possible - boolean s = producer.process(exchange, ac); + // route to original destination (using producer) and when done, then + // optional route to the after processor + boolean s = producer.process(exchange, ac2 != null ? ac2 : ac1); return doneSync && s; } else { if (LOG.isDebugEnabled()) { @@ -153,35 +129,14 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer { @Override protected void doBuild() throws Exception { CamelContextAware.trySetCamelContext(producer, endpoint.getCamelContext()); - // build pipeline with before/after processors - /* if (endpoint.getBefore() != null || endpoint.getAfter() != null) { - // detour the exchange using synchronous processing - AsyncProcessor before = null; - if (endpoint.getBefore() != null) { - before = AsyncProcessorConverterHelper.convert(endpoint.getBefore()); - } - AsyncProcessor ascb = new AsyncProcessorSupport() { - @Override - public boolean process(Exchange exchange, AsyncCallback callback) { - return callback(exchange, callback, true); - } - }; - // only execute the after if the intercept when predicate matches - final FilterProcessor filter = createFilterProcessor(); - pipeline = new Pipeline(getEndpoint().getCamelContext(), Arrays.asList(before, ascb, filter)); - }*/ - pipeline = createFilterProcessor(); + pipeline = new FilterProcessor(getEndpoint().getCamelContext(), onWhen, endpoint.getBefore()); if (endpoint.getAfter() != null) { after = AsyncProcessorConverterHelper.convert(endpoint.getAfter()); } ServiceHelper.buildService(producer, pipeline, after); } - private FilterProcessor createFilterProcessor() { - return new FilterProcessor(getEndpoint().getCamelContext(), onWhen, endpoint.getBefore()); - } - @Override protected void doInit() throws Exception { ServiceHelper.initService(producer, pipeline, after);
