This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f57e6e384fccb0ac8b6836a9ad7df167aa3e0643
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon May 4 11:25:16 2020 +0200

    CAMEL-14996: Splitter/Multicast EIP can cause a thread to starve from 
endless stackframes when splitting as it does not collapse its stackframes but 
keep scheduling for next split/task.
---
 .../apache/camel/processor/MulticastProcessor.java | 67 +++++++++++-----------
 1 file changed, 34 insertions(+), 33 deletions(-)

diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index f32ffa8..63106f2 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -74,8 +74,6 @@ import static org.apache.camel.util.ObjectHelper.notNull;
 /**
  * Implements the Multicast pattern to send a message exchange to a number of
  * endpoints, each endpoint receiving a copy of the message exchange.
- *
- * @see Pipeline
  */
 public class MulticastProcessor extends AsyncProcessorSupport implements 
Navigate<Processor>, Traceable, IdAware, RouteIdAware {
 
@@ -309,9 +307,6 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
             this.lock = new ReentrantLock();
             this.completion = new 
AsyncCompletionService<>(MulticastProcessor.this::schedule, !isStreaming(), 
lock);
             this.result = new AtomicReference<>();
-            if (timeout > 0) {
-                schedule(aggregateExecutorService, this::timeout, timeout, 
TimeUnit.MILLISECONDS);
-            }
         }
 
         @Override
@@ -414,34 +409,6 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
             }
         }
 
-        protected void timeout() {
-            Lock lock = this.lock;
-            if (lock.tryLock()) {
-                try {
-                    while (nbAggregated.get() < nbExchangeSent.get()) {
-                        Exchange exchange = completion.pollUnordered();
-                        int index = exchange != null ? 
getExchangeIndex(exchange) : nbExchangeSent.get();
-                        while (nbAggregated.get() < index) {
-                            AggregationStrategy strategy = 
getAggregationStrategy(null);
-                            strategy.timeout(result.get() != null ? 
result.get() : original,
-                                    nbAggregated.getAndIncrement(), 
nbExchangeSent.get(), timeout);
-                        }
-                        if (exchange != null) {
-                            doAggregate(result, exchange, original);
-                            nbAggregated.incrementAndGet();
-                        }
-                    }
-                    doDone(result.get(), true);
-                } catch (Throwable e) {
-                    original.setException(e);
-                    // and do the done work
-                    doDone(null, false);
-                } finally {
-                    lock.unlock();
-                }
-            }
-        }
-
         protected void doDone(Exchange exchange, boolean forceExhaust) {
             if (done.compareAndSet(false, true)) {
                 MulticastProcessor.this.doDone(original, exchange, pairs, 
callback, false, forceExhaust);
@@ -453,6 +420,9 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
 
         MulticastParallelTask(Exchange original, 
Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
             super(original, pairs, callback);
+            if (timeout > 0) {
+                schedule(aggregateExecutorService, this::timeout, timeout, 
TimeUnit.MILLISECONDS);
+            }
         }
 
         @Override
@@ -532,6 +502,34 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
                 doDone(null, false);
             }
         }
+
+        protected void timeout() {
+            Lock lock = this.lock;
+            if (lock.tryLock()) {
+                try {
+                    while (nbAggregated.get() < nbExchangeSent.get()) {
+                        Exchange exchange = completion.pollUnordered();
+                        int index = exchange != null ? 
getExchangeIndex(exchange) : nbExchangeSent.get();
+                        while (nbAggregated.get() < index) {
+                            AggregationStrategy strategy = 
getAggregationStrategy(null);
+                            strategy.timeout(result.get() != null ? 
result.get() : original,
+                                    nbAggregated.getAndIncrement(), 
nbExchangeSent.get(), timeout);
+                        }
+                        if (exchange != null) {
+                            doAggregate(result, exchange, original);
+                            nbAggregated.incrementAndGet();
+                        }
+                    }
+                    doDone(result.get(), true);
+                } catch (Throwable e) {
+                    original.setException(e);
+                    // and do the done work
+                    doDone(null, false);
+                } finally {
+                    lock.unlock();
+                }
+            }
+        }
     }
 
     protected void schedule(Executor executor, Runnable runnable, long delay, 
TimeUnit unit) {
@@ -895,6 +893,9 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
         if (isParallelProcessing() && executorService == null) {
             throw new IllegalArgumentException("ParallelProcessing is enabled 
but ExecutorService has not been set");
         }
+        if (timeout > 0 && !isParallelProcessing()) {
+            throw new IllegalArgumentException("Timeout is used but 
ParallelProcessing has not been enabled");
+        }
         if (timeout > 0 && aggregateExecutorService == null) {
             // use unbounded thread pool so we ensure the aggregate on-the-fly 
task always will have assigned a thread
             // and run the tasks when the task is submitted. If not then the 
aggregate task may not be able to run

Reply via email to