This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch zip in repository https://gitbox.apache.org/repos/asf/camel.git
commit 877966cb9efcd855967e8f2f96269e4b3e5d60ad Author: Claus Ibsen <[email protected]> AuthorDate: Mon Sep 2 09:56:07 2024 +0200 CAMEL-21114: camel-zipfile - ZipSplitter with AggregationStrategy does not aggregate all splits in transacted mode. --- .../apache/camel/processor/MulticastProcessor.java | 29 +++++++++++++++++----- 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java index da2d59d1067..0cb99b16e79 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -147,7 +147,15 @@ public class MulticastProcessor extends AsyncProcessorSupport @Override public void execute(Runnable command) { - schedule(command); + schedule(command, false); + } + } + + private final class SyncScheduler implements Executor { + + @Override + public void execute(Runnable command) { + schedule(command, true); } } @@ -172,6 +180,7 @@ public class MulticastProcessor extends AsyncProcessorSupport private final ExecutorService executorService; private final boolean shutdownExecutorService; private final Scheduler scheduler = new Scheduler(); + private final SyncScheduler syncScheduler = new SyncScheduler(); private ExecutorService aggregateExecutorService; private boolean shutdownAggregateExecutorService; private final long timeout; @@ -370,6 +379,10 @@ public class MulticastProcessor extends AsyncProcessorSupport } protected void schedule(final Runnable runnable) { + schedule(runnable, false); + } + + protected void schedule(final Runnable runnable, boolean sync) { if (isParallelProcessing()) { Runnable task = prepareParallelTask(runnable); try { @@ -379,6 +392,8 @@ public class MulticastProcessor extends AsyncProcessorSupport rej.reject(); } } + } else if (sync) { + reactiveExecutor.scheduleSync(runnable); } else { reactiveExecutor.schedule(runnable); } @@ -431,7 +446,8 @@ public class MulticastProcessor extends AsyncProcessorSupport final Map<String, String> mdc; final ScheduledFuture<?> timeoutTask; - MulticastTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback, int capacity) { + MulticastTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback, int capacity, + boolean sync) { this.original = original; this.pairs = pairs; this.callback = callback; @@ -450,9 +466,10 @@ public class MulticastProcessor extends AsyncProcessorSupport this.mdc = null; } if (capacity > 0) { - this.completion = new AsyncCompletionService<>(scheduler, !isStreaming(), lock, capacity); + this.completion + = new AsyncCompletionService<>(sync ? syncScheduler : scheduler, !isStreaming(), lock, capacity); } else { - this.completion = new AsyncCompletionService<>(scheduler, !isStreaming(), lock); + this.completion = new AsyncCompletionService<>(sync ? syncScheduler : scheduler, !isStreaming(), lock); } } @@ -557,7 +574,7 @@ public class MulticastProcessor extends AsyncProcessorSupport public MulticastReactiveTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback, int size) { - super(original, pairs, callback, size); + super(original, pairs, callback, size, false); } @Override @@ -654,7 +671,7 @@ public class MulticastProcessor extends AsyncProcessorSupport public MulticastTransactedTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback, int size) { - super(original, pairs, callback, size); + super(original, pairs, callback, size, true); } @Override
