This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new ed5cc8a6c9f Seda (#14703)
ed5cc8a6c9f is described below

commit ed5cc8a6c9f54a4fdf25a2eed03fa528c8b0f9a1
Author: Claus Ibsen <[email protected]>
AuthorDate: Tue Jul 2 18:01:34 2024 +0200

    Seda (#14703)
    
    * CAMEL-20934: camel-seda - Sending to consumers should use callback for 
completion work to avoid thread-safety issues
    
    * Run test once
---
 .../apache/camel/component/seda/SedaConsumer.java  | 41 ++++++++++++----------
 .../camel/processor/WireTapAbortPolicyTest.java    |  4 +--
 2 files changed, 24 insertions(+), 21 deletions(-)

diff --git 
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
 
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
index 1f04eded390..5fdb9188710 100644
--- 
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
+++ 
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -31,8 +32,6 @@ import org.apache.camel.Suspendable;
 import org.apache.camel.spi.ShutdownAware;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.DefaultConsumer;
-import org.apache.camel.support.EmptyAsyncCallback;
-import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.UnitOfWorkHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
@@ -182,17 +181,19 @@ public class SedaConsumer extends DefaultConsumer 
implements Runnable, ShutdownA
                 }
                 if (exchange != null) {
                     try {
+                        final Exchange target = exchange;
                         // prepare the exchange before sending to consumer
-                        Exchange newExchange = prepareExchange(exchange);
+                        prepareExchange(target);
+                        // callback to be executed when sending to consumer 
and processing is done
+                        AsyncCallback callback = doneSync -> {
+                            // log exception if an exception occurred and was 
not handled
+                            if (target.getException() != null) {
+                                getExceptionHandler().handleException("Error 
processing exchange", target,
+                                        target.getException());
+                            }
+                        };
                         // process the exchange
-                        sendToConsumers(newExchange);
-                        // copy result back
-                        ExchangeHelper.copyResults(exchange, newExchange);
-                        // log exception if an exception occurred and was not 
handled
-                        if (exchange.getException() != null) {
-                            getExceptionHandler().handleException("Error 
processing exchange", exchange,
-                                    exchange.getException());
-                        }
+                        sendToConsumers(target, callback);
                     } catch (Exception e) {
                         getExceptionHandler().handleException("Error 
processing exchange", exchange, e);
                     }
@@ -204,7 +205,6 @@ public class SedaConsumer extends DefaultConsumer 
implements Runnable, ShutdownA
             } catch (InterruptedException e) {
                 LOG.debug("Sleep interrupted, are we stopping? {}", 
isStopping() || isStopped());
                 Thread.currentThread().interrupt();
-                continue;
             } catch (Exception e) {
                 if (exchange != null) {
                     getExceptionHandler().handleException("Error processing 
exchange", exchange, e);
@@ -218,14 +218,12 @@ public class SedaConsumer extends DefaultConsumer 
implements Runnable, ShutdownA
     /**
      * Strategy to prepare exchange for being processed by this consumer
      *
-     * @param  exchange the exchange
-     * @return          the exchange to process by this consumer.
+     * @param exchange the exchange
      */
-    protected Exchange prepareExchange(Exchange exchange) {
+    protected void prepareExchange(Exchange exchange) {
         // this consumer grabbed the exchange so mark its from this 
route/endpoint
         exchange.getExchangeExtension().setFromEndpoint(getEndpoint());
         exchange.getExchangeExtension().setFromRouteId(getRouteId());
-        return exchange;
     }
 
     /**
@@ -237,9 +235,10 @@ public class SedaConsumer extends DefaultConsumer 
implements Runnable, ShutdownA
      * If there is only a single consumer then its dispatched directly to it 
using same thread.
      *
      * @param  exchange  the exchange
+     * @param  callback  exchange callback to continue routing
      * @throws Exception can be thrown if processing of the exchange failed
      */
-    protected void sendToConsumers(final Exchange exchange) throws Exception {
+    protected void sendToConsumers(final Exchange exchange, final 
AsyncCallback callback) throws Exception {
         // validate multiple consumers has been enabled
         int size = getEndpoint().getConsumers().size();
         if (size > 1 && !getEndpoint().isMultipleConsumersSupported()) {
@@ -263,11 +262,15 @@ public class SedaConsumer extends DefaultConsumer 
implements Runnable, ShutdownA
             // and use the asynchronous routing engine to support it
             mp.process(exchange, doneSync -> {
                 // done the uow on the completions
-                UnitOfWorkHelper.doneSynchronizations(exchange, completions);
+                try {
+                    UnitOfWorkHelper.doneSynchronizations(exchange, 
completions);
+                } finally {
+                    callback.done(doneSync);
+                }
             });
         } else {
             // use the regular processor and use the asynchronous routing 
engine to support it
-            getAsyncProcessor().process(exchange, EmptyAsyncCallback.get());
+            getAsyncProcessor().process(exchange, callback);
         }
     }
 
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
index 81b279f21dd..d96527a0f8d 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
@@ -29,7 +29,7 @@ import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 import org.junit.jupiter.api.parallel.Isolated;
 
@@ -59,7 +59,7 @@ public class WireTapAbortPolicyTest extends 
ContextTestSupport {
         }
     }
 
-    @RepeatedTest(value = 1000)
+    @Test
     public void testSend() throws Exception {
         // hello must come first, as we have delay on the tapped route
         result.expectedMinimumMessageCount(2);

Reply via email to