This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit f57e6e384fccb0ac8b6836a9ad7df167aa3e0643 Author: Claus Ibsen <[email protected]> AuthorDate: Mon May 4 11:25:16 2020 +0200 CAMEL-14996: Splitter/Multicast EIP can cause a thread to starve from endless stackframes when splitting as it does not collapse its stackframes but keep scheduling for next split/task. --- .../apache/camel/processor/MulticastProcessor.java | 67 +++++++++++----------- 1 file changed, 34 insertions(+), 33 deletions(-) diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java index f32ffa8..63106f2 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -74,8 +74,6 @@ import static org.apache.camel.util.ObjectHelper.notNull; /** * Implements the Multicast pattern to send a message exchange to a number of * endpoints, each endpoint receiving a copy of the message exchange. - * - * @see Pipeline */ public class MulticastProcessor extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, IdAware, RouteIdAware { @@ -309,9 +307,6 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat this.lock = new ReentrantLock(); this.completion = new AsyncCompletionService<>(MulticastProcessor.this::schedule, !isStreaming(), lock); this.result = new AtomicReference<>(); - if (timeout > 0) { - schedule(aggregateExecutorService, this::timeout, timeout, TimeUnit.MILLISECONDS); - } } @Override @@ -414,34 +409,6 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat } } - protected void timeout() { - Lock lock = this.lock; - if (lock.tryLock()) { - try { - while (nbAggregated.get() < nbExchangeSent.get()) { - Exchange exchange = completion.pollUnordered(); - int index = exchange != null ? getExchangeIndex(exchange) : nbExchangeSent.get(); - while (nbAggregated.get() < index) { - AggregationStrategy strategy = getAggregationStrategy(null); - strategy.timeout(result.get() != null ? result.get() : original, - nbAggregated.getAndIncrement(), nbExchangeSent.get(), timeout); - } - if (exchange != null) { - doAggregate(result, exchange, original); - nbAggregated.incrementAndGet(); - } - } - doDone(result.get(), true); - } catch (Throwable e) { - original.setException(e); - // and do the done work - doDone(null, false); - } finally { - lock.unlock(); - } - } - } - protected void doDone(Exchange exchange, boolean forceExhaust) { if (done.compareAndSet(false, true)) { MulticastProcessor.this.doDone(original, exchange, pairs, callback, false, forceExhaust); @@ -453,6 +420,9 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat MulticastParallelTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) { super(original, pairs, callback); + if (timeout > 0) { + schedule(aggregateExecutorService, this::timeout, timeout, TimeUnit.MILLISECONDS); + } } @Override @@ -532,6 +502,34 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat doDone(null, false); } } + + protected void timeout() { + Lock lock = this.lock; + if (lock.tryLock()) { + try { + while (nbAggregated.get() < nbExchangeSent.get()) { + Exchange exchange = completion.pollUnordered(); + int index = exchange != null ? getExchangeIndex(exchange) : nbExchangeSent.get(); + while (nbAggregated.get() < index) { + AggregationStrategy strategy = getAggregationStrategy(null); + strategy.timeout(result.get() != null ? result.get() : original, + nbAggregated.getAndIncrement(), nbExchangeSent.get(), timeout); + } + if (exchange != null) { + doAggregate(result, exchange, original); + nbAggregated.incrementAndGet(); + } + } + doDone(result.get(), true); + } catch (Throwable e) { + original.setException(e); + // and do the done work + doDone(null, false); + } finally { + lock.unlock(); + } + } + } } protected void schedule(Executor executor, Runnable runnable, long delay, TimeUnit unit) { @@ -895,6 +893,9 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat if (isParallelProcessing() && executorService == null) { throw new IllegalArgumentException("ParallelProcessing is enabled but ExecutorService has not been set"); } + if (timeout > 0 && !isParallelProcessing()) { + throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled"); + } if (timeout > 0 && aggregateExecutorService == null) { // use unbounded thread pool so we ensure the aggregate on-the-fly task always will have assigned a thread // and run the tasks when the task is submitted. If not then the aggregate task may not be able to run
