Author: davsclaus
Date: Mon Jan 10 10:18:58 2011
New Revision: 1057141

URL: http://svn.apache.org/viewvc?rev=1057141&view=rev
Log:
CAMEL-3497: Fixed rare potential deadlock issue with aggregate task not being 
given time to run due thread pool overloaded when running in parallel mode on 
multicast/splitter.

Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1057141&r1=1057140&r2=1057141&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 Mon Jan 10 10:18:58 2011
@@ -882,10 +882,13 @@ public class MulticastProcessor extends 
             throw new IllegalArgumentException("Timeout is used but 
ParallelProcessing has not been enabled");
         }
         if (isParallelProcessing() && aggregateExecutorService == null) {
-            // use cached thread pool so we ensure the aggregate on-the-fly 
task always will have assigned a thread
+            // use unbounded thread pool so we ensure the aggregate on-the-fly 
task always will have assigned a thread
             // and run the tasks when the task is submitted. If not then the 
aggregate task may not be able to run
             // and signal completion during processing, which would lead to a 
dead-lock
-            aggregateExecutorService = 
camelContext.getExecutorServiceStrategy().newCachedThreadPool(this, 
"AggregateTask");
+            // keep at least one thread in the pool so we re-use the thread 
avoiding to create new threads because
+            // the pool shrank to zero.
+            String name = getClass().getSimpleName() + "-AggregateTask";
+            aggregateExecutorService = 
camelContext.getExecutorServiceStrategy().newThreadPool(this, name, 1, 
Integer.MAX_VALUE);
         }
         ServiceHelper.startServices(processors);
     }


Reply via email to