Author: davsclaus
Date: Sat Dec 17 11:52:29 2011
New Revision: 1215469
URL: http://svn.apache.org/viewvc?rev=1215469&view=rev
Log:
CAMEL-4786: Add sized scheduled thread pool to ensure scheduled thread pools do
not eat up memory as the JDK pools is unbounded. Fixed throttler and delayer to
use thread pool profile so end user can customize core pool size with these
EIPs.
Added:
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/util/concurrent/SizedScheduledExecutorService.java
- copied unchanged from r1215240,
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SizedScheduledExecutorService.java
camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java
- copied, changed from r1215240,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java
camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/util/concurrent/SizedScheduledExecutorServiceTest.java
- copied unchanged from r1215240,
camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/SizedScheduledExecutorServiceTest.java
Modified:
camel/branches/camel-2.8.x/ (props changed)
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Dec 17 11:52:29 2011
@@ -1 +1 @@
-/camel/trunk:1202148,1202167,1202204-1202206,1202214-1202215,1202223,1202659,1202685,1203879,1203978,1204338,1205124,1205372,1205412,1205429,1205431,1205713,1206116,1206414,1207743,1207784,1208301,1208930,1208964-1208965,1209006-1209007,1209382,1209401,1209477,1209845-1209846,1210113,1210391,1210771,1210830,1211363,1211414,1211773,1211811,1212275-1212276,1212408,1213197,1213219,1213232,1213526,1214132,1214639,1214743,1215448
+/camel/trunk:1202148,1202167,1202204-1202206,1202214-1202215,1202223,1202659,1202685,1203879,1203978,1204338,1205124,1205372,1205412,1205429,1205431,1205713,1206116,1206414,1207743,1207784,1208301,1208930,1208964-1208965,1209006-1209007,1209382,1209401,1209477,1209845-1209846,1210113,1210391,1210771,1210830,1211363,1211414,1211773,1211811,1212275-1212276,1212408,1213197,1213219,1213232,1213526,1214132,1214639,1214743,1215240,1215448
Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified:
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=1215469&r1=1215468&r2=1215469&view=diff
==============================================================================
---
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
(original)
+++
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
Sat Dec 17 11:52:29 2011
@@ -38,6 +38,7 @@ import org.apache.camel.spi.LifecycleStr
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.concurrent.ExecutorServiceHelper;
+import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -172,7 +173,11 @@ public class DefaultExecutorServiceStrat
if (poolSize == null) {
poolSize = getDefaultThreadPoolProfile().getPoolSize();
}
- answer = newScheduledThreadPool(source, name, poolSize);
+ Integer maxQueueSize = profile.getMaxQueueSize();
+ if (maxQueueSize == null) {
+ maxQueueSize =
getDefaultThreadPoolProfile().getMaxQueueSize();
+ }
+ answer = newScheduledThreadPool(source, name, poolSize,
maxQueueSize);
if (answer != null) {
LOG.debug("Looking up ScheduledExecutorService with ref:
{} and found a matching ThreadPoolProfile to create the
ScheduledExecutorService: {}",
executorServiceRef, answer);
@@ -224,15 +229,20 @@ public class DefaultExecutorServiceStrat
public ScheduledExecutorService newScheduledThreadPool(Object source,
String name) {
int poolSize = getDefaultThreadPoolProfile().getPoolSize();
- return newScheduledThreadPool(source, name, poolSize);
+ int queueSize = getDefaultThreadPoolProfile().getMaxQueueSize();
+ return newScheduledThreadPool(source, name, poolSize, queueSize);
}
public ScheduledExecutorService newScheduledThreadPool(Object source,
String name, int poolSize) {
- ScheduledExecutorService answer =
ExecutorServiceHelper.newScheduledThreadPool(poolSize, threadNamePattern, name,
true);
+ return newScheduledThreadPool(source, name, poolSize, 0);
+ }
+
+ public ScheduledExecutorService newScheduledThreadPool(Object source,
String name, int poolSize, int maxQueueSize) {
+ ScheduledExecutorService answer =
ExecutorServiceHelper.newScheduledThreadPool(poolSize, maxQueueSize,
threadNamePattern, name, true);
onThreadPoolCreated(answer, source, null);
if (LOG.isDebugEnabled()) {
- LOG.debug("Created new scheduled thread pool for source: {} with
name: {}. [poolSize={}]. -> {}", new Object[]{source, name, poolSize, answer});
+ LOG.debug("Created new scheduled thread pool for source: {} with
name: {}. [poolSize={}, maxQueueSize={}]. -> {}", new Object[]{source, name,
poolSize, maxQueueSize, answer});
}
return answer;
}
@@ -392,8 +402,13 @@ public class DefaultExecutorServiceStrat
}
// let lifecycle strategy be notified as well which can let it be
managed in JMX as well
+ ThreadPoolExecutor threadPool = null;
if (executorService instanceof ThreadPoolExecutor) {
- ThreadPoolExecutor threadPool = (ThreadPoolExecutor)
executorService;
+ threadPool = (ThreadPoolExecutor) executorService;
+ } else if (executorService instanceof SizedScheduledExecutorService) {
+ threadPool = ((SizedScheduledExecutorService)
executorService).getScheduledThreadPoolExecutor();
+ }
+ if (threadPool != null) {
for (LifecycleStrategy lifecycle :
camelContext.getLifecycleStrategies()) {
lifecycle.onThreadPoolAdd(camelContext, threadPool, id,
sourceId, routeId, threadPoolProfileId);
}
Modified:
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java?rev=1215469&r1=1215468&r2=1215469&view=diff
==============================================================================
---
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
(original)
+++
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
Sat Dec 17 11:52:29 2011
@@ -84,6 +84,7 @@ public abstract class DelayProcessorSupp
long delay = calculateDelay(exchange);
if (delay <= 0) {
// no delay then continue routing
+ log.trace("No delay for exchangeId: {}", exchange.getExchangeId());
return super.process(exchange, callback);
}
@@ -113,6 +114,7 @@ public abstract class DelayProcessorSupp
if (!isRunAllowed()) {
exchange.setException(new
RejectedExecutionException());
} else {
+ log.debug("Scheduling rejected task, so letting caller
run, delaying at first for {} millis for exchangeId: {}", delay,
exchange.getExchangeId());
// let caller run by processing
try {
delay(delay, exchange);
Modified:
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java?rev=1215469&r1=1215468&r2=1215469&view=diff
==============================================================================
---
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
(original)
+++
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
Sat Dec 17 11:52:29 2011
@@ -165,6 +165,17 @@ public interface ExecutorServiceStrategy
/**
* Creates a new scheduled thread pool.
+ *
+ * @param source the source object, usually it should be
<tt>this</tt> passed in as parameter
+ * @param name name which is appended to the thread name
+ * @param poolSize the core pool size
+ * @param maxQueueSize the max queue size, use 0 or negative for unbounded
+ * @return the created thread pool
+ */
+ ScheduledExecutorService newScheduledThreadPool(Object source, String
name, int poolSize, int maxQueueSize);
+
+ /**
+ * Creates a new scheduled thread pool.
* <p/>
* Will use the pool size from the default thread pool profile
*
Modified:
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=1215469&r1=1215468&r2=1215469&view=diff
==============================================================================
---
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
(original)
+++
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
Sat Dec 17 11:52:29 2011
@@ -23,6 +23,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@@ -106,7 +107,31 @@ public final class ExecutorServiceHelper
* @return the created pool
*/
public static ScheduledExecutorService newScheduledThreadPool(final int
poolSize, final String pattern, final String name, final boolean daemon) {
- return Executors.newScheduledThreadPool(poolSize, new
CamelThreadFactory(pattern, name, daemon));
+ return newScheduledThreadPool(poolSize, 0, pattern, name, daemon);
+ }
+
+ /**
+ * Creates a new scheduled thread pool which can schedule threads.
+ *
+ * @param poolSize the core pool size
+ * @param maxQueueSize max queue size, use 0 or negative for unbounded
+ * @param pattern pattern of the thread name
+ * @param name ${name} in the pattern name
+ * @param daemon whether the threads is daemon or not
+ * @return the created pool
+ */
+ public static ScheduledExecutorService newScheduledThreadPool(final int
poolSize, final int maxQueueSize, final String pattern, final String name,
final boolean daemon) {
+ ScheduledThreadPoolExecutor answer = new
ScheduledThreadPoolExecutor(poolSize, new CamelThreadFactory(pattern, name,
daemon));
+ // TODO: when JDK7 we should setRemoveOnCancelPolicy(true)
+
+ // need to wrap the thread pool in a sized to guard against the
problem that the
+ // JDK created thread pool has an unbounded queue (see class javadoc),
which mean
+ // we could potentially keep adding tasks, and run out of memory.
+ if (maxQueueSize > 0) {
+ return new SizedScheduledExecutorService(answer, maxQueueSize);
+ } else {
+ return answer;
+ }
}
/**
Copied:
camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java
(from r1215240,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java)
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java?p2=camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java&r1=1215240&r2=1215469&rev=1215469&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java
(original)
+++
camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java
Sat Dec 17 11:52:29 2011
@@ -16,14 +16,15 @@
*/
package org.apache.camel.processor;
-import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.builder.ThreadPoolProfileBuilder;
+import org.apache.camel.impl.ThreadPoolProfileSupport;
+import org.apache.camel.management.ManagementTestSupport;
+import org.apache.camel.spi.ThreadPoolProfile;
/**
*
*/
-public class ThrottlerAsyncDelayedCallerRunsTest extends ContextTestSupport {
+public class ThrottlerAsyncDelayedCallerRunsTest extends ManagementTestSupport
{
public void testThrottler() throws Exception {
getMockEndpoint("mock:result").expectedMessageCount(6);
@@ -44,9 +45,10 @@ public class ThrottlerAsyncDelayedCaller
@Override
public void configure() throws Exception {
// create a profile for the throttler
- ThreadPoolProfileBuilder builder = new
ThreadPoolProfileBuilder("myThrottler");
- builder.maxQueueSize(2);
-
context.getExecutorServiceManager().registerThreadPoolProfile(builder.build());
+ ThreadPoolProfile profile = new
ThreadPoolProfileSupport("myThrottler");
+ profile.setMaxPoolSize(5);
+ profile.setMaxQueueSize(2);
+
context.getExecutorServiceStrategy().registerThreadPoolProfile(profile);
from("seda:start")
.throttle(1).timePeriodMillis(100)