Author: davsclaus
Date: Mon Jan 10 09:59:22 2011
New Revision: 1057139
URL: http://svn.apache.org/viewvc?rev=1057139&view=rev
Log:
CAMEL-3497: Fixed rare potential deadlock issue with aggregate task not being
given time to run due thread pool overloaded when running in parallel mode on
multicast/splitter.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1057139&r1=1057138&r2=1057139&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
Mon Jan 10 09:59:22 2011
@@ -35,6 +35,7 @@ import org.apache.camel.WaitForTaskToCom
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.processor.MulticastProcessor;
import org.apache.camel.spi.BrowsableEndpoint;
+import org.apache.camel.util.ServiceHelper;
/**
* An implementation of the <a
@@ -103,7 +104,11 @@ public class SedaEndpoint extends Defaul
return conumserMulticastProcessor;
}
- protected synchronized void updateMulticastProcessor() {
+ protected synchronized void updateMulticastProcessor() throws Exception {
+ if (conumserMulticastProcessor != null) {
+ ServiceHelper.stopService(conumserMulticastProcessor);
+ }
+
int size = getConsumers().size();
if (size == 0 && multicastExecutor != null) {
// stop the multicastExecutor
@@ -118,7 +123,7 @@ public class SedaEndpoint extends Defaul
processors.add(consumer.getProcessor());
}
conumserMulticastProcessor = new MulticastProcessor(getCamelContext(),
processors, null, true, multicastExecutor, false, false, 0);
-
+ ServiceHelper.startService(conumserMulticastProcessor);
}
public void setQueue(BlockingQueue<Exchange> queue) {
@@ -203,12 +208,12 @@ public class SedaEndpoint extends Defaul
producers.remove(producer);
}
- void onStarted(SedaConsumer consumer) {
+ void onStarted(SedaConsumer consumer) throws Exception {
consumers.add(consumer);
updateMulticastProcessor();
}
- void onStopped(SedaConsumer consumer) {
+ void onStopped(SedaConsumer consumer) throws Exception {
consumers.remove(consumer);
updateMulticastProcessor();
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1057139&r1=1057138&r2=1057139&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
Mon Jan 10 09:59:22 2011
@@ -144,6 +144,7 @@ public class MulticastProcessor extends
private final boolean streaming;
private final boolean stopOnException;
private final ExecutorService executorService;
+ private ExecutorService aggregateExecutorService;
private final long timeout;
private final ConcurrentMap<PreparedErrorHandler, Processor> errorHandlers
= new ConcurrentHashMap<PreparedErrorHandler, Processor>();
@@ -233,6 +234,7 @@ public class MulticastProcessor extends
final boolean streaming, final
AsyncCallback callback) throws Exception {
ObjectHelper.notNull(executorService, "ExecutorService", this);
+ ObjectHelper.notNull(aggregateExecutorService,
"AggregateExecutorService", this);
final CompletionService<Exchange> completion;
if (streaming) {
@@ -260,7 +262,7 @@ public class MulticastProcessor extends
aggregationOnTheFlyDone, allTasksSubmitted,
executionException);
// and start the aggregation task so we can aggregate on-the-fly
- executorService.submit(task);
+ aggregateExecutorService.submit(task);
}
LOG.trace("Starting to submit parallel tasks");
@@ -383,11 +385,10 @@ public class MulticastProcessor extends
}
} finally {
// must signal we are done so the latch can open and let the
other thread continue processing
- LOG.trace("Signaling we are done aggregating on the fly");
+ LOG.debug("Signaling we are done aggregating on the fly");
+ LOG.trace("Aggregate on the fly task +++ done +++");
aggregationOnTheFlyDone.countDown();
}
-
- LOG.trace("Aggregate on the fly task +++ done +++");
}
private void aggregateOnTheFly() throws InterruptedException,
ExecutionException {
@@ -880,6 +881,12 @@ public class MulticastProcessor extends
if (timeout > 0 && !isParallelProcessing()) {
throw new IllegalArgumentException("Timeout is used but
ParallelProcessing has not been enabled");
}
+ if (isParallelProcessing() && aggregateExecutorService == null) {
+ // use cached thread pool so we ensure the aggregate on-the-fly
task always will have assigned a thread
+ // and run the tasks when the task is submitted. If not then the
aggregate task may not be able to run
+ // and signal completion during processing, which would lead to a
dead-lock
+ aggregateExecutorService =
camelContext.getExecutorServiceStrategy().newCachedThreadPool(this,
"AggregateTask");
+ }
ServiceHelper.startServices(processors);
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java?rev=1057139&r1=1057138&r2=1057139&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
Mon Jan 10 09:59:22 2011
@@ -144,13 +144,13 @@ public interface ExecutorServiceStrategy
/**
* Creates a new cached thread pool.
+ * <p/>
+ * <b>Important:</b> Using cached thread pool is discouraged as they have
no upper bound and can overload the JVM.
*
* @param source the source object, usually it should be
<tt>this</tt> passed in as parameter
* @param name name which is appended to the thread name
* @return the created thread pool
- * @deprecated using cached thread pool is discouraged as they have no
upper bound and can overload the JVM
*/
- @Deprecated
ExecutorService newCachedThreadPool(Object source, String name);
/**
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=1057139&r1=1057138&r2=1057139&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
Mon Jan 10 09:59:22 2011
@@ -27,7 +27,7 @@ import java.util.concurrent.SynchronousQ
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.camel.model.ExecutorServiceAwareDefinition;
import org.apache.camel.spi.ExecutorServiceStrategy;
@@ -49,12 +49,12 @@ import org.apache.camel.util.ObjectHelpe
public final class ExecutorServiceHelper {
public static final String DEFAULT_PATTERN = "Camel Thread ${counter} -
${name}";
- private static AtomicInteger threadCounter = new AtomicInteger();
+ private static AtomicLong threadCounter = new AtomicLong();
private ExecutorServiceHelper() {
}
- private static int nextThreadCounter() {
+ private static long nextThreadCounter() {
return threadCounter.getAndIncrement();
}
@@ -152,15 +152,15 @@ public final class ExecutorServiceHelper
}
/**
- * Creates a new cached thread pool
+ * Creates a new cached thread pool.
+ * <p/>
+ * <b>Important:</b> Using cached thread pool is discouraged as they have
no upper bound and can overload the JVM.
*
* @param pattern pattern of the thread name
* @param name ${name} in the pattern name
* @param daemon whether the threads is daemon or not
* @return the created pool
- * @deprecated using cached thread pool is discouraged as they have no
upper bound and can overload the JVM
*/
- @Deprecated
public static ExecutorService newCachedThreadPool(final String pattern,
final String name, final boolean daemon) {
return Executors.newCachedThreadPool(new ThreadFactory() {
public Thread newThread(Runnable r) {
@@ -187,7 +187,7 @@ public final class ExecutorServiceHelper
*
* @param pattern pattern of the thread name
* @param name ${name} in the pattern name
- * @param corePoolSize the core size
+ * @param corePoolSize the core pool size
* @param maxPoolSize the maximum pool size
* @return the created pool
*/
@@ -201,7 +201,7 @@ public final class ExecutorServiceHelper
*
* @param pattern pattern of the thread name
* @param name ${name} in the pattern name
- * @param corePoolSize the core size
+ * @param corePoolSize the core pool size
* @param maxPoolSize the maximum pool size
* @param maxQueueSize the maximum number of tasks in the queue, use
<tt>Integer.MAX_VALUE</tt> or <tt>-1</tt> to indicate unbounded
* @return the created pool
@@ -216,7 +216,7 @@ public final class ExecutorServiceHelper
*
* @param pattern pattern of the thread name
* @param name ${name} in the pattern name
- * @param corePoolSize the core size
+ * @param corePoolSize the core pool size
* @param maxPoolSize the maximum pool size
* @param keepAliveTime keep alive time
* @param timeUnit keep alive time unit