Author: davsclaus
Date: Wed Sep 5 09:53:23 2012
New Revision: 1381120
URL: http://svn.apache.org/viewvc?rev=1381120&view=rev
Log:
CAMEL-5563: Camel now shutdown thread pools graceful at first and then fallback
to be aggresive as before. Added more logging details during shutdown, as well
logging if the shutdown takes a while. As well if there was any thread pools
when Camel shutdown that wasnt properly shutdown beforehand. The graceful
shutdown uses a 30 sec timeout.
Modified:
camel/branches/camel-2.10.x/ (props changed)
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/CamelThreadFactory.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/SizedScheduledExecutorService.java
camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
camel/branches/camel-2.10.x/camel-core/src/test/resources/log4j.properties
camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
camel/branches/camel-2.10.x/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java
camel/branches/camel-2.10.x/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
camel/branches/camel-2.10.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
Merged /camel/trunk:r1381072
Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified:
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
(original)
+++
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
Wed Sep 5 09:53:23 2012
@@ -77,7 +77,7 @@ public class DataSetConsumer extends Def
super.doStop();
if (executorService != null) {
-
camelContext.getExecutorServiceManager().shutdownNow(executorService);
+ camelContext.getExecutorServiceManager().shutdown(executorService);
executorService = null;
}
}
Modified:
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
(original)
+++
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
Wed Sep 5 09:53:23 2012
@@ -306,7 +306,7 @@ public class SedaConsumer extends Servic
protected void doShutdown() throws Exception {
// only shutdown thread pool when we shutdown
if (executor != null) {
-
endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+
endpoint.getCamelContext().getExecutorServiceManager().shutdown(executor);
executor = null;
}
}
Modified:
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
(original)
+++
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
Wed Sep 5 09:53:23 2012
@@ -372,7 +372,7 @@ public class SedaEndpoint extends Defaul
}
// shutdown thread pool if it was in use
if (multicastExecutor != null) {
-
getCamelContext().getExecutorServiceManager().shutdownNow(multicastExecutor);
+
getCamelContext().getExecutorServiceManager().shutdown(multicastExecutor);
multicastExecutor = null;
}
Modified:
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java
(original)
+++
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java
Wed Sep 5 09:53:23 2012
@@ -40,6 +40,8 @@ import org.apache.camel.spi.ThreadPoolFa
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.StopWatch;
+import org.apache.camel.util.TimeUtils;
import org.apache.camel.util.URISupport;
import org.apache.camel.util.concurrent.CamelThreadFactory;
import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
@@ -57,6 +59,7 @@ public class DefaultExecutorServiceManag
private ThreadPoolFactory threadPoolFactory = new
DefaultThreadPoolFactory();
private final List<ExecutorService> executorServices = new
ArrayList<ExecutorService>();
private String threadNamePattern;
+ private long shutdownAwaitTermination = 30000;
private String defaultThreadPoolProfileId = "defaultThreadPoolProfile";
private final Map<String, ThreadPoolProfile> threadPoolProfiles = new
HashMap<String, ThreadPoolProfile>();
private ThreadPoolProfile builtIndefaultProfile;
@@ -126,7 +129,17 @@ public class DefaultExecutorServiceManag
String name = threadNamePattern.replaceFirst("#camelId#",
this.camelContext.getName());
this.threadNamePattern = name;
}
-
+
+ @Override
+ public long getShutdownAwaitTermination() {
+ return shutdownAwaitTermination;
+ }
+
+ @Override
+ public void setShutdownAwaitTermination(long shutdownAwaitTermination) {
+ this.shutdownAwaitTermination = shutdownAwaitTermination;
+ }
+
@Override
public String resolveThreadName(String name) {
return ThreadHelper.resolveThreadName(threadNamePattern, name);
@@ -244,11 +257,50 @@ public class DefaultExecutorServiceManag
@Override
public void shutdown(ExecutorService executorService) {
ObjectHelper.notNull(executorService, "executorService");
+ shutdown(executorService, shutdownAwaitTermination);
+ }
+ @Override
+ public void shutdown(ExecutorService executorService, long
shutdownAwaitTermination) {
+ ObjectHelper.notNull(executorService, "executorService");
+ if (shutdownAwaitTermination <= 0) {
+ throw new IllegalArgumentException("ShutdownAwaitTermination must
be a positive number, was: " + shutdownAwaitTermination);
+ }
+
+
+ // shutting down a thread pool is a 2 step process. First we try
graceful, and if that fails, then we go more aggressively
+ // and try shutting down again. In both cases we wait at most the
given shutdown timeout value given
+ // (total wait could then be 2 x shutdownAwaitTermination)
+ boolean warned = false;
+ StopWatch watch = new StopWatch();
if (!executorService.isShutdown()) {
- LOG.debug("Shutdown ExecutorService: {}", executorService);
+ LOG.trace("Shutdown of ExecutorService: {} with await termination:
{} millis", executorService, shutdownAwaitTermination);
executorService.shutdown();
- LOG.trace("Shutdown ExecutorService: {} complete.",
executorService);
+ try {
+ if (!awaitTermination(executorService,
shutdownAwaitTermination)) {
+ warned = true;
+ LOG.warn("Forcing shutdown of ExecutorService: {} due
first await termination elapsed.", executorService);
+ executorService.shutdownNow();
+ // we are now shutting down aggressively, so wait to see
if we can completely shutdown or not
+ if (!awaitTermination(executorService,
shutdownAwaitTermination)) {
+ LOG.warn("Cannot completely force shutdown of
ExecutorService: {} due second await termination elapsed.", executorService);
+ }
+ }
+ } catch (InterruptedException e) {
+ warned = true;
+ LOG.warn("Forcing shutdown of ExecutorService: {} due
interrupted.", executorService);
+ // we were interrupted during shutdown, so force shutdown
+ executorService.shutdownNow();
+ }
+
+ // if we logged at WARN level, then report at INFO level when we
are complete so the end user can see this in the log
+ if (warned) {
+ LOG.info("Shutdown of ExecutorService: {} is shutdown: {} and
terminated: {} took: {}.",
+ new Object[]{executorService,
executorService.isShutdown(), executorService.isTerminated(),
TimeUtils.printDuration(watch.taken())});
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("Shutdown of ExecutorService: {} is shutdown: {} and
terminated: {} took: {}.",
+ new Object[]{executorService,
executorService.isShutdown(), executorService.isTerminated(),
TimeUtils.printDuration(watch.taken())});
+ }
}
if (executorService instanceof ThreadPoolExecutor) {
@@ -262,19 +314,56 @@ public class DefaultExecutorServiceManag
executorServices.remove(executorService);
}
+ /**
+ * Awaits the termination of the thread pool.
+ * <p/>
+ * This implementation will log every 5th second at INFO level that we are
waiting, so the end user
+ * can see we are not hanging in case it takes longer time to shutdown the
pool.
+ *
+ * @param executorService the thread pool
+ * @param shutdownAwaitTermination time in millis to use as timeout
+ * @return <tt>true</tt> if the pool is terminated, or <tt>false</tt> if
we timed out
+ * @throws InterruptedException is thrown if we are interrupted during the
waiting
+ */
+ private static boolean awaitTermination(ExecutorService executorService,
long shutdownAwaitTermination) throws InterruptedException {
+ // log progress every 5th second so end user is aware of we are
shutting down
+ StopWatch watch = new StopWatch();
+ long interval = Math.min(5000, shutdownAwaitTermination);
+ boolean done = false;
+ while (!done && interval > 0) {
+ if (executorService.awaitTermination(interval,
TimeUnit.MILLISECONDS)) {
+ done = true;
+ } else {
+ LOG.info("Waited {} for ExecutorService: {} to shutdown...",
TimeUtils.printDuration(watch.taken()), executorService);
+ // recalculate interval
+ interval = Math.min(5000, shutdownAwaitTermination -
watch.taken());
+ }
+ }
+
+ return done;
+ }
+
@Override
public List<Runnable> shutdownNow(ExecutorService executorService) {
- return doShutdownNow(executorService, true);
+ return doShutdownNow(executorService, false);
}
- private List<Runnable> doShutdownNow(ExecutorService executorService,
boolean remove) {
+ private List<Runnable> doShutdownNow(ExecutorService executorService,
boolean failSafe) {
ObjectHelper.notNull(executorService, "executorService");
List<Runnable> answer = null;
if (!executorService.isShutdown()) {
- LOG.debug("ShutdownNow ExecutorService: {}", executorService);
+ if (failSafe) {
+ // log as warn, as we shutdown as fail-safe, so end user
should see more details in the log.
+ LOG.warn("Forcing shutdown of ExecutorService: {}",
executorService);
+ } else {
+ LOG.debug("Forcing shutdown of ExecutorService: {}",
executorService);
+ }
answer = executorService.shutdownNow();
- LOG.trace("ShutdownNow ExecutorService: {} complete.",
executorService);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Shutdown of ExecutorService: {} is shutdown: {} and
terminated: {}.",
+ new Object[]{executorService,
executorService.isShutdown(), executorService.isTerminated()});
+ }
}
if (executorService instanceof ThreadPoolExecutor) {
@@ -285,7 +374,7 @@ public class DefaultExecutorServiceManag
}
// remove reference as its shutdown
- if (remove) {
+ if (!failSafe) {
executorServices.remove(executorService);
}
@@ -316,17 +405,23 @@ public class DefaultExecutorServiceManag
@Override
protected void doShutdown() throws Exception {
- // shutdown all executor services by looping
- for (ExecutorService executorService : executorServices) {
- // only log if something goes wrong as we want to shutdown them all
- try {
- // must not remove during looping, as we clear the list
afterwards
- doShutdownNow(executorService, false);
- } catch (Throwable e) {
- LOG.warn("Error occurred during shutdown of ExecutorService: "
- + executorService + ". This exception will be
ignored.", e);
+ // shutdown all remainder executor services by looping and doing this
aggressively
+ // as by normal all threads pool should have been shutdown using
proper lifecycle
+ // by their EIPs, components etc. This is acting as a fail-safe during
shutdown
+ // of CamelContext itself.
+ if (!executorServices.isEmpty()) {
+ LOG.warn("Shutting down {} ExecutorService's which has not been
shutdown properly (acting as fail-safe)", executorServices.size());
+ for (ExecutorService executorService : executorServices) {
+ // only log if something goes wrong as we want to shutdown
them all
+ try {
+ doShutdownNow(executorService, true);
+ } catch (Throwable e) {
+ LOG.warn("Error occurred during shutdown of
ExecutorService: "
+ + executorService + ". This exception will be
ignored.", e);
+ }
}
}
+ // clear list
executorServices.clear();
// do not clear the default profile as we could potential be restarted
Modified:
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
(original)
+++
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
Wed Sep 5 09:53:23 2012
@@ -355,7 +355,7 @@ public class DefaultShutdownStrategy ext
@Override
protected void doShutdown() throws Exception {
if (executor != null) {
- camelContext.getExecutorServiceManager().shutdownNow(executor);
+ camelContext.getExecutorServiceManager().shutdown(executor);
// should clear executor so we can restart by creating a new
thread pool
executor = null;
}
Modified:
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
(original)
+++
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
Wed Sep 5 09:53:23 2012
@@ -360,7 +360,7 @@ public abstract class ScheduledPollConsu
@Override
protected void doShutdown() throws Exception {
if (shutdownExecutor && scheduledExecutorService != null) {
-
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutorService);
+
getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(scheduledExecutorService);
scheduledExecutorService = null;
future = null;
}
Modified:
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
(original)
+++
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
Wed Sep 5 09:53:23 2012
@@ -232,7 +232,7 @@ public abstract class DelayProcessorSupp
@Override
protected void doShutdown() throws Exception {
if (shutdownExecutorService && executorService != null) {
-
camelContext.getExecutorServiceManager().shutdownNow(executorService);
+ camelContext.getExecutorServiceManager().shutdown(executorService);
}
super.doShutdown();
}
Modified:
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
(original)
+++
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
Wed Sep 5 09:53:23 2012
@@ -965,7 +965,7 @@ public class MulticastProcessor extends
errorHandlers.clear();
if (shutdownExecutorService && executorService != null) {
-
getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+
getCamelContext().getExecutorServiceManager().shutdown(executorService);
}
}
Modified:
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
(original)
+++
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
Wed Sep 5 09:53:23 2012
@@ -80,7 +80,7 @@ public class OnCompletionProcessor exten
protected void doShutdown() throws Exception {
ServiceHelper.stopAndShutdownService(processor);
if (shutdownExecutorService) {
-
getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+
getCamelContext().getExecutorServiceManager().shutdown(executorService);
}
}
Modified:
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
(original)
+++
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
Wed Sep 5 09:53:23 2012
@@ -171,7 +171,7 @@ public class ThreadsProcessor extends Se
protected void doShutdown() throws Exception {
if (shutdownExecutorService) {
-
camelContext.getExecutorServiceManager().shutdownNow(executorService);
+ camelContext.getExecutorServiceManager().shutdown(executorService);
}
super.doShutdown();
}
Modified:
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java
(original)
+++
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java
Wed Sep 5 09:53:23 2012
@@ -157,7 +157,7 @@ public class ThroughputLogger extends Se
@Override
public void doStop() throws Exception {
if (logSchedulerService != null) {
-
camelContext.getExecutorServiceManager().shutdownNow(logSchedulerService);
+
camelContext.getExecutorServiceManager().shutdown(logSchedulerService);
logSchedulerService = null;
}
}
Modified:
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
(original)
+++
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
Wed Sep 5 09:53:23 2012
@@ -217,7 +217,7 @@ public class WireTapProcessor extends Se
protected void doShutdown() throws Exception {
ServiceHelper.stopAndShutdownService(processor);
if (shutdownExecutorService) {
-
destination.getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+
destination.getCamelContext().getExecutorServiceManager().shutdown(executorService);
}
}
}
Modified:
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
(original)
+++
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
Wed Sep 5 09:53:23 2012
@@ -917,7 +917,7 @@ public class AggregateProcessor extends
// and is better suited for preparing to shutdown than this doStop
method is
if (recoverService != null) {
-
camelContext.getExecutorServiceManager().shutdownNow(recoverService);
+ camelContext.getExecutorServiceManager().shutdown(recoverService);
}
ServiceHelper.stopServices(timeoutMap, processor,
deadLetterProducerTemplate);
@@ -970,10 +970,10 @@ public class AggregateProcessor extends
inProgressCompleteExchanges.clear();
if (shutdownExecutorService) {
-
camelContext.getExecutorServiceManager().shutdownNow(executorService);
+ camelContext.getExecutorServiceManager().shutdown(executorService);
}
if (shutdownTimeoutCheckerExecutorService) {
-
camelContext.getExecutorServiceManager().shutdownNow(timeoutCheckerExecutorService);
+
camelContext.getExecutorServiceManager().shutdown(timeoutCheckerExecutorService);
timeoutCheckerExecutorService = null;
}
Modified:
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java
(original)
+++
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java
Wed Sep 5 09:53:23 2012
@@ -43,6 +43,11 @@ import org.apache.camel.ShutdownableServ
* If you use the <tt>newXXX</tt> methods to create thread pools, then Camel
will by default take care of
* shutting down those created pools when {@link
org.apache.camel.CamelContext} is shutting down.
* <p/>
+ * For more information about shutting down thread pools see the {@link
#shutdown(java.util.concurrent.ExecutorService)}
+ * and {@link #shutdownNow(java.util.concurrent.ExecutorService)}, and {@link
#getShutdownAwaitTermination()} methods.
+ * Notice the details about using a graceful shutdown at fist, and then
falling back to aggressive shutdown in case
+ * of await termination timeout occurred.
+ *
* @see ThreadPoolFactory
*/
public interface ExecutorServiceManager extends ShutdownableService {
@@ -121,6 +126,26 @@ public interface ExecutorServiceManager
String getThreadNamePattern();
/**
+ * Sets the time to wait for thread pools to shutdown orderly, when
invoking the
+ * {@link #shutdown()} method.
+ * <p/>
+ * The default value is <tt>30000</tt> millis.
+ *
+ * @param timeInMillis time in millis.
+ */
+ void setShutdownAwaitTermination(long timeInMillis);
+
+ /**
+ * Gets the time to wait for thread pools to shutdown orderly, when
invoking the
+ * {@link #shutdown()} method.
+ * <p/>
+ * The default value is <tt>30000</tt> millis.
+ *
+ * @return the timeout value
+ */
+ long getShutdownAwaitTermination();
+
+ /**
* Creates a new thread pool using the default thread pool profile.
*
* @param source the source object, usually it should be <tt>this</tt>
passed in as parameter
@@ -246,15 +271,41 @@ public interface ExecutorServiceManager
ScheduledExecutorService newScheduledThreadPool(Object source, String
name, String profileId);
/**
- * Shutdown the given executor service.
+ * Shutdown the given executor service graceful at first, and then
aggressively
+ * if the await termination timeout was hit.
+ * <p/>
+ * Will try to perform an orderly shutdown by giving the running threads
+ * time to complete tasks, before going more aggressively by doing a
+ * {@link #shutdownNow(java.util.concurrent.ExecutorService)} which
+ * forces a shutdown. The {@link #getShutdownAwaitTermination()}
+ * is used as timeout value waiting for orderly shutdown to
+ * complete normally, before going aggressively.
*
* @param executorService the executor service to shutdown
* @see java.util.concurrent.ExecutorService#shutdown()
+ * @see #getShutdownAwaitTermination()
*/
void shutdown(ExecutorService executorService);
/**
- * Shutdown now the given executor service.
+ * Shutdown the given executor service graceful at first, and then
aggressively
+ * if the await termination timeout was hit.
+ * <p/>
+ * Will try to perform an orderly shutdown by giving the running threads
+ * time to complete tasks, before going more aggressively by doing a
+ * {@link #shutdownNow(java.util.concurrent.ExecutorService)} which
+ * forces a shutdown. The parameter <tt>shutdownAwaitTermination</tt>
+ * is used as timeout value waiting for orderly shutdown to
+ * complete normally, before going aggressively.
+ *
+ * @param executorService the executor service to shutdown
+ * @param shutdownAwaitTermination timeout in millis to wait for orderly
shutdown
+ * @see java.util.concurrent.ExecutorService#shutdown()
+ */
+ void shutdown(ExecutorService executorService, long
shutdownAwaitTermination);
+
+ /**
+ * Shutdown now the given executor service aggressively.
*
* @param executorService the executor service to shutdown now
* @return list of tasks that never commenced execution
Modified:
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/CamelThreadFactory.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/CamelThreadFactory.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/CamelThreadFactory.java
(original)
+++
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/CamelThreadFactory.java
Wed Sep 5 09:53:23 2012
@@ -46,6 +46,10 @@ public final class CamelThreadFactory im
return answer;
}
+ public String getName() {
+ return name;
+ }
+
public String toString() {
return "CamelThreadFactory[" + name + "]";
}
Modified:
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java
(original)
+++
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java
Wed Sep 5 09:53:23 2012
@@ -78,4 +78,15 @@ public class RejectableScheduledThreadPo
return new RejectableFutureTask<T>(callable);
}
+ @Override
+ public String toString() {
+ // the thread factory often have more precise details what the thread
pool is used for
+ if (getThreadFactory() instanceof CamelThreadFactory) {
+ String name = ((CamelThreadFactory) getThreadFactory()).getName();
+ return super.toString() + "[" + name + "]";
+ } else {
+ return super.toString();
+ }
+ }
+
}
Modified:
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java
(original)
+++
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java
Wed Sep 5 09:53:23 2012
@@ -84,4 +84,15 @@ public class RejectableThreadPoolExecuto
return new RejectableFutureTask<T>(callable);
}
+ @Override
+ public String toString() {
+ // the thread factory often have more precise details what the thread
pool is used for
+ if (getThreadFactory() instanceof CamelThreadFactory) {
+ String name = ((CamelThreadFactory) getThreadFactory()).getName();
+ return super.toString() + "[" + name + "]";
+ } else {
+ return super.toString();
+ }
+ }
+
}
Modified:
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/SizedScheduledExecutorService.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/SizedScheduledExecutorService.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/SizedScheduledExecutorService.java
(original)
+++
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/util/concurrent/SizedScheduledExecutorService.java
Wed Sep 5 09:53:23 2012
@@ -303,6 +303,12 @@ public class SizedScheduledExecutorServi
@Override
public String toString() {
- return delegate.toString();
+ // the thread factory often have more precise details what the thread
pool is used for
+ if (delegate.getThreadFactory() instanceof CamelThreadFactory) {
+ String name = ((CamelThreadFactory)
delegate.getThreadFactory()).getName();
+ return super.toString() + "[" + name + "]";
+ } else {
+ return super.toString();
+ }
}
}
Modified:
camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
(original)
+++
camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
Wed Sep 5 09:53:23 2012
@@ -16,6 +16,7 @@
*/
package org.apache.camel.impl;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -456,4 +457,34 @@ public class DefaultExecutorServiceManag
assertTrue(tp.isShutdown());
}
+ // this is a manual test, by looking at the logs
+ public void xxxTestLongShutdownOfThreadPool() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ ExecutorService pool =
context.getExecutorServiceManager().newSingleThreadExecutor(this, "Cool");
+
+ pool.execute(new Runnable() {
+ @Override
+ public void run() {
+ log.info("Starting thread");
+
+ // this should take a long time to shutdown
+ try {
+ latch.await(42, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+
+ log.info("Existing thread");
+ }
+ });
+
+ // sleep a bit before shutting down
+ Thread.sleep(3000);
+
+ context.getExecutorServiceManager().shutdown(pool);
+
+ assertTrue(pool.isShutdown());
+ assertTrue(pool.isTerminated());
+ }
+
}
Modified:
camel/branches/camel-2.10.x/camel-core/src/test/resources/log4j.properties
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/test/resources/log4j.properties?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/test/resources/log4j.properties
(original)
+++ camel/branches/camel-2.10.x/camel-core/src/test/resources/log4j.properties
Wed Sep 5 09:53:23 2012
@@ -33,6 +33,7 @@ log4j.logger.org.apache.camel.impl.Defau
#log4j.logger.org.apache.camel.component.seda=TRACE
#log4j.logger.org.apache.camel.component.file=TRACE
#log4j.logger.org.apache.camel.impl.DefaultUnitOfWork=TRACE
+#log4j.logger.org.apache.camel.impl.DefaultExecutorServiceManager=TRACE
#log4j.logger.org.apache.camel.component.mock=DEBUG
#log4j.logger.org.apache.camel.component.file=TRACE
#log4j.logger.org.apache.camel.processor.DefaultErrorHandler=TRACE
Modified:
camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
(original)
+++
camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
Wed Sep 5 09:53:23 2012
@@ -222,7 +222,7 @@ public class SqsConsumer extends Schedul
protected void doShutdown() throws Exception {
super.doShutdown();
if (scheduledExecutor != null) {
-
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutor);
+
getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(scheduledExecutor);
scheduledExecutor = null;
}
}
Modified:
camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
(original)
+++
camel/branches/camel-2.10.x/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
Wed Sep 5 09:53:23 2012
@@ -62,7 +62,7 @@ public class HazelcastSedaConsumer exten
@Override
protected void doStop() throws Exception {
if (executor != null) {
-
endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+
endpoint.getCamelContext().getExecutorServiceManager().shutdown(executor);
executor = null;
}
super.doStop();
Modified:
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
(original)
+++
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
Wed Sep 5 09:53:23 2012
@@ -148,7 +148,7 @@ public class HdfsProducer extends Defaul
protected void doStop() throws Exception {
super.doStop();
if (scheduler != null) {
-
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(scheduler);
+
getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(scheduler);
scheduler = null;
}
if (ostream != null) {
Modified:
camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
(original)
+++
camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
Wed Sep 5 09:53:23 2012
@@ -405,7 +405,7 @@ public class JmsComponent extends Defaul
@Override
protected void doShutdown() throws Exception {
if (asyncStartStopExecutorService != null) {
-
getCamelContext().getExecutorServiceManager().shutdownNow(asyncStartStopExecutorService);
+
getCamelContext().getExecutorServiceManager().shutdown(asyncStartStopExecutorService);
asyncStartStopExecutorService = null;
}
super.doShutdown();
Modified:
camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
(original)
+++
camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
Wed Sep 5 09:53:23 2012
@@ -238,7 +238,7 @@ public abstract class ReplyManagerSuppor
// must also stop executor service
if (executorService != null) {
-
camelContext.getExecutorServiceManager().shutdownNow(executorService);
+ camelContext.getExecutorServiceManager().shutdown(executorService);
executorService = null;
}
}
Modified:
camel/branches/camel-2.10.x/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java
(original)
+++
camel/branches/camel-2.10.x/components/camel-kestrel/src/main/java/org/apache/camel/component/kestrel/KestrelConsumer.java
Wed Sep 5 09:53:23 2012
@@ -86,10 +86,12 @@ public class KestrelConsumer extends Def
log.info("Stopping consumer for " + endpoint.getEndpointUri());
if (pollerExecutor != null) {
-
endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(pollerExecutor);
+
endpoint.getCamelContext().getExecutorServiceManager().shutdown(pollerExecutor);
+ pollerExecutor = null;
}
if (handlerExecutor != null) {
-
endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(handlerExecutor);
+
endpoint.getCamelContext().getExecutorServiceManager().shutdown(handlerExecutor);
+ handlerExecutor = null;
}
super.doStop();
Modified:
camel/branches/camel-2.10.x/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
(original)
+++
camel/branches/camel-2.10.x/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
Wed Sep 5 09:53:23 2012
@@ -110,7 +110,7 @@ public class MinaEndpoint extends Defaul
protected void doShutdown() throws Exception {
// shutdown thread pools
for (ExecutorService executor : executors) {
-
getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+ getCamelContext().getExecutorServiceManager().shutdown(executor);
}
executors.clear();
super.doShutdown();
Modified:
camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
(original)
+++
camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
Wed Sep 5 09:53:23 2012
@@ -101,10 +101,12 @@ public class NettyConsumer extends Defau
// and then shutdown the thread pools
if (bossExecutor != null) {
- context.getExecutorServiceManager().shutdownNow(bossExecutor);
+ context.getExecutorServiceManager().shutdown(bossExecutor);
+ bossExecutor = null;
}
if (workerExecutor != null) {
- context.getExecutorServiceManager().shutdownNow(workerExecutor);
+ context.getExecutorServiceManager().shutdown(workerExecutor);
+ workerExecutor = null;
}
LOG.info("Netty consumer unbound from: " + configuration.getAddress());
Modified:
camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
(original)
+++
camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
Wed Sep 5 09:53:23 2012
@@ -128,10 +128,12 @@ public class NettyProducer extends Defau
// and then shutdown the thread pools
if (bossExecutor != null) {
- context.getExecutorServiceManager().shutdownNow(bossExecutor);
+ context.getExecutorServiceManager().shutdown(bossExecutor);
+ bossExecutor = null;
}
if (workerExecutor != null) {
- context.getExecutorServiceManager().shutdownNow(workerExecutor);
+ context.getExecutorServiceManager().shutdown(workerExecutor);
+ workerExecutor = null;
}
super.doStop();
Modified:
camel/branches/camel-2.10.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java?rev=1381120&r1=1381119&r2=1381120&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
(original)
+++
camel/branches/camel-2.10.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
Wed Sep 5 09:53:23 2012
@@ -85,7 +85,7 @@ public class StreamConsumer extends Defa
// important: do not close the stream as it will close the standard
// system.in etc.
if (executor != null) {
-
endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+
endpoint.getCamelContext().getExecutorServiceManager().shutdown(executor);
executor = null;
}
lines.clear();