This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch seda in repository https://gitbox.apache.org/repos/asf/camel.git
commit fffa87bb259087a4d7b5290435f64f3bbaf1e6b1 Author: Claus Ibsen <[email protected]> AuthorDate: Tue Jul 2 14:57:25 2024 +0200 CAMEL-20934: camel-seda - Sending to consumers should use callback for completion work to avoid thread-safety issues --- .../apache/camel/component/seda/SedaConsumer.java | 41 ++++++++++++---------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java index 1f04eded390..5fdb9188710 100644 --- a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java +++ b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java @@ -23,6 +23,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -31,8 +32,6 @@ import org.apache.camel.Suspendable; import org.apache.camel.spi.ShutdownAware; import org.apache.camel.spi.Synchronization; import org.apache.camel.support.DefaultConsumer; -import org.apache.camel.support.EmptyAsyncCallback; -import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.UnitOfWorkHelper; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; @@ -182,17 +181,19 @@ public class SedaConsumer extends DefaultConsumer implements Runnable, ShutdownA } if (exchange != null) { try { + final Exchange target = exchange; // prepare the exchange before sending to consumer - Exchange newExchange = prepareExchange(exchange); + prepareExchange(target); + // callback to be executed when sending to consumer and processing is done + AsyncCallback callback = doneSync -> { + // log exception if an exception occurred and was not handled + if (target.getException() != null) { + getExceptionHandler().handleException("Error processing exchange", target, + target.getException()); + } + }; // process the exchange - sendToConsumers(newExchange); - // copy result back - ExchangeHelper.copyResults(exchange, newExchange); - // log exception if an exception occurred and was not handled - if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing exchange", exchange, - exchange.getException()); - } + sendToConsumers(target, callback); } catch (Exception e) { getExceptionHandler().handleException("Error processing exchange", exchange, e); } @@ -204,7 +205,6 @@ public class SedaConsumer extends DefaultConsumer implements Runnable, ShutdownA } catch (InterruptedException e) { LOG.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped()); Thread.currentThread().interrupt(); - continue; } catch (Exception e) { if (exchange != null) { getExceptionHandler().handleException("Error processing exchange", exchange, e); @@ -218,14 +218,12 @@ public class SedaConsumer extends DefaultConsumer implements Runnable, ShutdownA /** * Strategy to prepare exchange for being processed by this consumer * - * @param exchange the exchange - * @return the exchange to process by this consumer. + * @param exchange the exchange */ - protected Exchange prepareExchange(Exchange exchange) { + protected void prepareExchange(Exchange exchange) { // this consumer grabbed the exchange so mark its from this route/endpoint exchange.getExchangeExtension().setFromEndpoint(getEndpoint()); exchange.getExchangeExtension().setFromRouteId(getRouteId()); - return exchange; } /** @@ -237,9 +235,10 @@ public class SedaConsumer extends DefaultConsumer implements Runnable, ShutdownA * If there is only a single consumer then its dispatched directly to it using same thread. * * @param exchange the exchange + * @param callback exchange callback to continue routing * @throws Exception can be thrown if processing of the exchange failed */ - protected void sendToConsumers(final Exchange exchange) throws Exception { + protected void sendToConsumers(final Exchange exchange, final AsyncCallback callback) throws Exception { // validate multiple consumers has been enabled int size = getEndpoint().getConsumers().size(); if (size > 1 && !getEndpoint().isMultipleConsumersSupported()) { @@ -263,11 +262,15 @@ public class SedaConsumer extends DefaultConsumer implements Runnable, ShutdownA // and use the asynchronous routing engine to support it mp.process(exchange, doneSync -> { // done the uow on the completions - UnitOfWorkHelper.doneSynchronizations(exchange, completions); + try { + UnitOfWorkHelper.doneSynchronizations(exchange, completions); + } finally { + callback.done(doneSync); + } }); } else { // use the regular processor and use the asynchronous routing engine to support it - getAsyncProcessor().process(exchange, EmptyAsyncCallback.get()); + getAsyncProcessor().process(exchange, callback); } }
