This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch zip
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 877966cb9efcd855967e8f2f96269e4b3e5d60ad
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Sep 2 09:56:07 2024 +0200

    CAMEL-21114: camel-zipfile - ZipSplitter with AggregationStrategy does not 
aggregate all splits in transacted mode.
---
 .../apache/camel/processor/MulticastProcessor.java | 29 +++++++++++++++++-----
 1 file changed, 23 insertions(+), 6 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index da2d59d1067..0cb99b16e79 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -147,7 +147,15 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
 
         @Override
         public void execute(Runnable command) {
-            schedule(command);
+            schedule(command, false);
+        }
+    }
+
+    private final class SyncScheduler implements Executor {
+
+        @Override
+        public void execute(Runnable command) {
+            schedule(command, true);
         }
     }
 
@@ -172,6 +180,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
     private final ExecutorService executorService;
     private final boolean shutdownExecutorService;
     private final Scheduler scheduler = new Scheduler();
+    private final SyncScheduler syncScheduler = new SyncScheduler();
     private ExecutorService aggregateExecutorService;
     private boolean shutdownAggregateExecutorService;
     private final long timeout;
@@ -370,6 +379,10 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
     }
 
     protected void schedule(final Runnable runnable) {
+        schedule(runnable, false);
+    }
+
+    protected void schedule(final Runnable runnable, boolean sync) {
         if (isParallelProcessing()) {
             Runnable task = prepareParallelTask(runnable);
             try {
@@ -379,6 +392,8 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
                     rej.reject();
                 }
             }
+        } else if (sync) {
+            reactiveExecutor.scheduleSync(runnable);
         } else {
             reactiveExecutor.schedule(runnable);
         }
@@ -431,7 +446,8 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
         final Map<String, String> mdc;
         final ScheduledFuture<?> timeoutTask;
 
-        MulticastTask(Exchange original, Iterable<ProcessorExchangePair> 
pairs, AsyncCallback callback, int capacity) {
+        MulticastTask(Exchange original, Iterable<ProcessorExchangePair> 
pairs, AsyncCallback callback, int capacity,
+                      boolean sync) {
             this.original = original;
             this.pairs = pairs;
             this.callback = callback;
@@ -450,9 +466,10 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
                 this.mdc = null;
             }
             if (capacity > 0) {
-                this.completion = new AsyncCompletionService<>(scheduler, 
!isStreaming(), lock, capacity);
+                this.completion
+                        = new AsyncCompletionService<>(sync ? syncScheduler : 
scheduler, !isStreaming(), lock, capacity);
             } else {
-                this.completion = new AsyncCompletionService<>(scheduler, 
!isStreaming(), lock);
+                this.completion = new AsyncCompletionService<>(sync ? 
syncScheduler : scheduler, !isStreaming(), lock);
             }
         }
 
@@ -557,7 +574,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
 
         public MulticastReactiveTask(Exchange original, 
Iterable<ProcessorExchangePair> pairs, AsyncCallback callback,
                                      int size) {
-            super(original, pairs, callback, size);
+            super(original, pairs, callback, size, false);
         }
 
         @Override
@@ -654,7 +671,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
 
         public MulticastTransactedTask(Exchange original, 
Iterable<ProcessorExchangePair> pairs, AsyncCallback callback,
                                        int size) {
-            super(original, pairs, callback, size);
+            super(original, pairs, callback, size, true);
         }
 
         @Override

Reply via email to