Author: davsclaus
Date: Sat Dec 17 11:52:29 2011
New Revision: 1215469

URL: http://svn.apache.org/viewvc?rev=1215469&view=rev
Log:
CAMEL-4786: Add sized scheduled thread pool to ensure scheduled thread pools do 
not eat up memory as the JDK pools is unbounded. Fixed throttler and delayer to 
use thread pool profile so end user can customize core pool size with these 
EIPs.

Added:
    
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/util/concurrent/SizedScheduledExecutorService.java
      - copied unchanged from r1215240, 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SizedScheduledExecutorService.java
    
camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java
      - copied, changed from r1215240, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java
    
camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/util/concurrent/SizedScheduledExecutorServiceTest.java
      - copied unchanged from r1215240, 
camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/SizedScheduledExecutorServiceTest.java
Modified:
    camel/branches/camel-2.8.x/   (props changed)
    
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
    
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
    
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
    
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java

Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Dec 17 11:52:29 2011
@@ -1 +1 @@
-/camel/trunk:1202148,1202167,1202204-1202206,1202214-1202215,1202223,1202659,1202685,1203879,1203978,1204338,1205124,1205372,1205412,1205429,1205431,1205713,1206116,1206414,1207743,1207784,1208301,1208930,1208964-1208965,1209006-1209007,1209382,1209401,1209477,1209845-1209846,1210113,1210391,1210771,1210830,1211363,1211414,1211773,1211811,1212275-1212276,1212408,1213197,1213219,1213232,1213526,1214132,1214639,1214743,1215448
+/camel/trunk:1202148,1202167,1202204-1202206,1202214-1202215,1202223,1202659,1202685,1203879,1203978,1204338,1205124,1205372,1205412,1205429,1205431,1205713,1206116,1206414,1207743,1207784,1208301,1208930,1208964-1208965,1209006-1209007,1209382,1209401,1209477,1209845-1209846,1210113,1210391,1210771,1210830,1211363,1211414,1211773,1211811,1212275-1212276,1212408,1213197,1213219,1213232,1213526,1214132,1214639,1214743,1215240,1215448

Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: 
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=1215469&r1=1215468&r2=1215469&view=diff
==============================================================================
--- 
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
 (original)
+++ 
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
 Sat Dec 17 11:52:29 2011
@@ -38,6 +38,7 @@ import org.apache.camel.spi.LifecycleStr
 import org.apache.camel.spi.ThreadPoolProfile;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.concurrent.ExecutorServiceHelper;
+import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -172,7 +173,11 @@ public class DefaultExecutorServiceStrat
                 if (poolSize == null) {
                     poolSize = getDefaultThreadPoolProfile().getPoolSize();
                 }
-                answer = newScheduledThreadPool(source, name, poolSize);
+                Integer maxQueueSize = profile.getMaxQueueSize();
+                if (maxQueueSize == null) {
+                    maxQueueSize = 
getDefaultThreadPoolProfile().getMaxQueueSize();
+                }
+                answer = newScheduledThreadPool(source, name, poolSize, 
maxQueueSize);
                 if (answer != null) {
                     LOG.debug("Looking up ScheduledExecutorService with ref: 
{} and found a matching ThreadPoolProfile to create the 
ScheduledExecutorService: {}",
                             executorServiceRef, answer);
@@ -224,15 +229,20 @@ public class DefaultExecutorServiceStrat
 
     public ScheduledExecutorService newScheduledThreadPool(Object source, 
String name) {
         int poolSize = getDefaultThreadPoolProfile().getPoolSize();
-        return newScheduledThreadPool(source, name, poolSize);
+        int queueSize = getDefaultThreadPoolProfile().getMaxQueueSize();
+        return newScheduledThreadPool(source, name, poolSize, queueSize);
     }
 
     public ScheduledExecutorService newScheduledThreadPool(Object source, 
String name, int poolSize) {
-        ScheduledExecutorService answer = 
ExecutorServiceHelper.newScheduledThreadPool(poolSize, threadNamePattern, name, 
true);
+        return newScheduledThreadPool(source, name, poolSize, 0);
+    }
+
+    public ScheduledExecutorService newScheduledThreadPool(Object source, 
String name, int poolSize, int maxQueueSize) {
+        ScheduledExecutorService answer = 
ExecutorServiceHelper.newScheduledThreadPool(poolSize, maxQueueSize, 
threadNamePattern, name, true);
         onThreadPoolCreated(answer, source, null);
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Created new scheduled thread pool for source: {} with 
name: {}. [poolSize={}]. -> {}", new Object[]{source, name, poolSize, answer});
+            LOG.debug("Created new scheduled thread pool for source: {} with 
name: {}. [poolSize={}, maxQueueSize={}]. -> {}", new Object[]{source, name, 
poolSize, maxQueueSize, answer});
         }
         return answer;
     }
@@ -392,8 +402,13 @@ public class DefaultExecutorServiceStrat
         }
 
         // let lifecycle strategy be notified as well which can let it be 
managed in JMX as well
+        ThreadPoolExecutor threadPool = null;
         if (executorService instanceof ThreadPoolExecutor) {
-            ThreadPoolExecutor threadPool = (ThreadPoolExecutor) 
executorService;
+            threadPool = (ThreadPoolExecutor) executorService;
+        } else if (executorService instanceof SizedScheduledExecutorService) {
+            threadPool = ((SizedScheduledExecutorService) 
executorService).getScheduledThreadPoolExecutor();
+        }
+        if (threadPool != null) {
             for (LifecycleStrategy lifecycle : 
camelContext.getLifecycleStrategies()) {
                 lifecycle.onThreadPoolAdd(camelContext, threadPool, id, 
sourceId, routeId, threadPoolProfileId);
             }

Modified: 
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java?rev=1215469&r1=1215468&r2=1215469&view=diff
==============================================================================
--- 
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
 (original)
+++ 
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
 Sat Dec 17 11:52:29 2011
@@ -84,6 +84,7 @@ public abstract class DelayProcessorSupp
         long delay = calculateDelay(exchange);
         if (delay <= 0) {
             // no delay then continue routing
+            log.trace("No delay for exchangeId: {}", exchange.getExchangeId());
             return super.process(exchange, callback);
         }
 
@@ -113,6 +114,7 @@ public abstract class DelayProcessorSupp
                     if (!isRunAllowed()) {
                         exchange.setException(new 
RejectedExecutionException());
                     } else {
+                        log.debug("Scheduling rejected task, so letting caller 
run, delaying at first for {} millis for exchangeId: {}", delay, 
exchange.getExchangeId());
                         // let caller run by processing
                         try {
                             delay(delay, exchange);

Modified: 
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java?rev=1215469&r1=1215468&r2=1215469&view=diff
==============================================================================
--- 
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
 (original)
+++ 
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
 Sat Dec 17 11:52:29 2011
@@ -165,6 +165,17 @@ public interface ExecutorServiceStrategy
 
     /**
      * Creates a new scheduled thread pool.
+     *
+     * @param source      the source object, usually it should be 
<tt>this</tt> passed in as parameter
+     * @param name        name which is appended to the thread name
+     * @param poolSize    the core pool size
+     * @param maxQueueSize the max queue size, use 0 or negative for unbounded
+     * @return the created thread pool
+     */
+    ScheduledExecutorService newScheduledThreadPool(Object source, String 
name, int poolSize, int maxQueueSize);
+
+    /**
+     * Creates a new scheduled thread pool.
      * <p/>
      * Will use the pool size from the default thread pool profile
      *

Modified: 
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=1215469&r1=1215468&r2=1215469&view=diff
==============================================================================
--- 
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
 (original)
+++ 
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
 Sat Dec 17 11:52:29 2011
@@ -23,6 +23,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -106,7 +107,31 @@ public final class ExecutorServiceHelper
      * @return the created pool
      */
     public static ScheduledExecutorService newScheduledThreadPool(final int 
poolSize, final String pattern, final String name, final boolean daemon) {
-        return Executors.newScheduledThreadPool(poolSize, new 
CamelThreadFactory(pattern, name, daemon));
+        return newScheduledThreadPool(poolSize, 0, pattern, name, daemon);
+    }
+
+    /**
+     * Creates a new scheduled thread pool which can schedule threads.
+     *
+     * @param poolSize the core pool size
+     * @param maxQueueSize max queue size, use 0 or negative for unbounded
+     * @param pattern  pattern of the thread name
+     * @param name     ${name} in the pattern name
+     * @param daemon   whether the threads is daemon or not
+     * @return the created pool
+     */
+    public static ScheduledExecutorService newScheduledThreadPool(final int 
poolSize, final int maxQueueSize, final String pattern, final String name, 
final boolean daemon) {
+        ScheduledThreadPoolExecutor answer = new 
ScheduledThreadPoolExecutor(poolSize, new CamelThreadFactory(pattern, name, 
daemon));
+        // TODO: when JDK7 we should setRemoveOnCancelPolicy(true)
+
+        // need to wrap the thread pool in a sized to guard against the 
problem that the
+        // JDK created thread pool has an unbounded queue (see class javadoc), 
which mean
+        // we could potentially keep adding tasks, and run out of memory.
+        if (maxQueueSize > 0) {
+            return new SizedScheduledExecutorService(answer, maxQueueSize);
+        } else {
+            return answer;
+        }
     }
 
     /**

Copied: 
camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java
 (from r1215240, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java)
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java?p2=camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java&r1=1215240&r2=1215469&rev=1215469&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java
 (original)
+++ 
camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java
 Sat Dec 17 11:52:29 2011
@@ -16,14 +16,15 @@
  */
 package org.apache.camel.processor;
 
-import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.builder.ThreadPoolProfileBuilder;
+import org.apache.camel.impl.ThreadPoolProfileSupport;
+import org.apache.camel.management.ManagementTestSupport;
+import org.apache.camel.spi.ThreadPoolProfile;
 
 /**
  *
  */
-public class ThrottlerAsyncDelayedCallerRunsTest extends ContextTestSupport {
+public class ThrottlerAsyncDelayedCallerRunsTest extends ManagementTestSupport 
{
     
     public void testThrottler() throws Exception {
         getMockEndpoint("mock:result").expectedMessageCount(6);
@@ -44,9 +45,10 @@ public class ThrottlerAsyncDelayedCaller
             @Override
             public void configure() throws Exception {
                 // create a profile for the throttler
-                ThreadPoolProfileBuilder builder = new 
ThreadPoolProfileBuilder("myThrottler");
-                builder.maxQueueSize(2);
-                
context.getExecutorServiceManager().registerThreadPoolProfile(builder.build());
+                ThreadPoolProfile profile = new 
ThreadPoolProfileSupport("myThrottler");
+                profile.setMaxPoolSize(5);
+                profile.setMaxQueueSize(2);
+                
context.getExecutorServiceStrategy().registerThreadPoolProfile(profile);
                 
                 from("seda:start")
                     .throttle(1).timePeriodMillis(100)


Reply via email to