This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit dcff7fee1df493229d1ac114ca73a3957c7174b0 Author: Claus Ibsen <[email protected]> AuthorDate: Mon May 4 12:38:10 2020 +0200 CAMEL-14996: Splitter/Multicast EIP can cause a thread to starve from endless stackframes when splitting as it does not collapse its stackframes but keep scheduling for next split/task. --- .../apache/camel/processor/MulticastProcessor.java | 72 +++++++++++++++++++++- 1 file changed, 69 insertions(+), 3 deletions(-) diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 63106f2..d0b1e2c 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import org.apache.camel.AggregationStrategy; import org.apache.camel.AsyncCallback; @@ -285,6 +286,66 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat } } + private interface MulticastCompletionService { + + Exchange poll(); + + Exchange pollUnordered(); + + void submit(Consumer<Consumer<Exchange>> runner); + + } + + private class MulticastCompletionServiceParallelTask implements MulticastCompletionService { + private final AsyncCompletionService<Exchange> completion; + + public MulticastCompletionServiceParallelTask(ReentrantLock lock) { + this.completion = new AsyncCompletionService<>(MulticastProcessor.this::schedule, !isStreaming(), lock);; + } + + @Override + public Exchange poll() { + return completion.poll(); + } + + @Override + public Exchange pollUnordered() { + return completion.pollUnordered(); + } + + @Override + public void submit(Consumer<Consumer<Exchange>> runner) { + completion.submit(runner); + } + } + + private class MulticastCompletionServiceTask implements MulticastCompletionService { + + private final AtomicReference<Exchange> exchange = new AtomicReference<>(); + + public MulticastCompletionServiceTask() { + } + + @Override + public Exchange poll() { + return exchange.getAndSet(null); + } + + @Override + public Exchange pollUnordered() { + return exchange.getAndSet(null); + } + + @Override + public void submit(Consumer<Consumer<Exchange>> runner) { + runner.accept(this::setResult); + } + + private void setResult(Exchange result) { + this.exchange.set(result); + } + } + protected class MulticastTask implements Runnable { final Exchange original; @@ -292,7 +353,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat final AsyncCallback callback; final Iterator<ProcessorExchangePair> iterator; final ReentrantLock lock; - final AsyncCompletionService<Exchange> completion; + MulticastCompletionService completion; final AtomicReference<Exchange> result; final AtomicInteger nbExchangeSent = new AtomicInteger(); final AtomicInteger nbAggregated = new AtomicInteger(); @@ -305,7 +366,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat this.callback = callback; this.iterator = pairs.iterator(); this.lock = new ReentrantLock(); - this.completion = new AsyncCompletionService<>(MulticastProcessor.this::schedule, !isStreaming(), lock); + this.completion = new MulticastCompletionServiceTask(); this.result = new AtomicReference<>(); } @@ -314,6 +375,10 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat return "MulticastTask"; } + private Exchange completionPoll() { + return completion.poll(); + } + @Override public void run() { try { @@ -393,7 +458,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat if (lock.tryLock()) { try { Exchange exchange; - while (!done.get() && (exchange = completion.poll()) != null) { + while (!done.get() && (exchange = completionPoll()) != null) { doAggregate(result, exchange, original); if (nbAggregated.incrementAndGet() >= nbExchangeSent.get() && allSent.get()) { doDone(result.get(), true); @@ -420,6 +485,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat MulticastParallelTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) { super(original, pairs, callback); + this.completion = new MulticastCompletionServiceParallelTask(lock); if (timeout > 0) { schedule(aggregateExecutorService, this::timeout, timeout, TimeUnit.MILLISECONDS); }
