Author: davsclaus
Date: Tue Mar 16 12:02:37 2010
New Revision: 923703
URL: http://svn.apache.org/viewvc?rev=923703&view=rev
Log:
CAMEL-1588: Added rejection policy for thread pool settings. CallersRun will by
default be used.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java
(with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsMaxQueueSizeTest.java
(contents, props changed)
- copied, changed from r923639,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsCoreAndMaxPoolTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedPolicyTest.java
(with props)
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelThreadPoolFactoryBean.java
(contents, props changed)
- copied, changed from r923645,
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelExecutorServiceFactoryBean.java
Removed:
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelExecutorServiceFactoryBean.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
camel/trunk/camel-core/src/main/resources/org/apache/camel/jaxb.index
camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/handler/CamelNamespaceHandler.java
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThreadsExternalThreadPoolFactoryBeanTest.xml
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java?rev=923703&view=auto
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java
(added)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java
Tue Mar 16 12:02:37 2010
@@ -0,0 +1,38 @@
+package org.apache.camel;
+
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import javax.xml.bind.annotation.XmlEnum;
+import javax.xml.bind.annotation.XmlType;
+
+/**
+ * Represent the kinds of options for rejection handlers for thread pools.
+ * <p/>
+ * These options are used for fine grained thread pool settings, where you
+ * want to control which handler to use when a thread pool cannot execute
+ * a new task.
+ * <p/>
+ * Camel will by default use <tt>CallerRuns</tt>.
+ *
+ * @version $Revision$
+ */
+...@xmltype
+...@xmlenum(String.class)
+public enum ThreadPoolRejectedPolicy {
+
+ Abort, CallerRuns, DiscardOldest, Discard;
+
+ public RejectedExecutionHandler asRejectedExecutionHandler() {
+ if (this == Abort) {
+ return new ThreadPoolExecutor.AbortPolicy();
+ } else if (this == CallerRuns) {
+ return new ThreadPoolExecutor.CallerRunsPolicy();
+ } else if (this == DiscardOldest) {
+ return new ThreadPoolExecutor.DiscardOldestPolicy();
+ } else if (this == Discard) {
+ return new ThreadPoolExecutor.DiscardPolicy();
+ }
+ throw new IllegalArgumentException("Unknown ThreadPoolRejectedPolicy:
" + this);
+ }
+
+}
Propchange:
camel/trunk/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java?rev=923703&r1=923702&r2=923703&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThreadPoolBuilder.java
Tue Mar 16 12:02:37 2010
@@ -20,6 +20,7 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
+import org.apache.camel.ThreadPoolRejectedPolicy;
import org.apache.camel.model.ThreadPoolProfileDefinition;
/**
@@ -63,6 +64,11 @@ public final class ThreadPoolBuilder {
return this;
}
+ public ThreadPoolBuilder rejectedPolicy(ThreadPoolRejectedPolicy
rejectedPolicy) {
+ threadPoolDefinition.rejectedPolicy(rejectedPolicy);
+ return this;
+ }
+
/**
* Lookup a {...@link java.util.concurrent.ExecutorService} from the
{...@link org.apache.camel.spi.Registry}.
*
@@ -95,7 +101,7 @@ public final class ThreadPoolBuilder {
ExecutorService answer =
camelContext.getExecutorServiceStrategy().newThreadPool(source, name,
threadPoolDefinition.getPoolSize(),
threadPoolDefinition.getMaxPoolSize(),
threadPoolDefinition.getKeepAliveTime(),
threadPoolDefinition.getTimeUnit(),
- threadPoolDefinition.getMaxQueueSize(), false);
+ threadPoolDefinition.getMaxQueueSize(),
threadPoolDefinition.getRejectedExecutionHandler(), false);
return answer;
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=923703&r1=923702&r2=923703&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
Tue Mar 16 12:02:37 2010
@@ -19,6 +19,7 @@ package org.apache.camel.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -83,7 +84,7 @@ public class DefaultExecutorServiceStrat
ExecutorService answer =
ExecutorServiceHelper.newThreadPool(threadNamePattern, name,
defaultThreadPoolProfile.getPoolSize(),
defaultThreadPoolProfile.getMaxPoolSize(),
defaultThreadPoolProfile.getKeepAliveTime(),
defaultThreadPoolProfile.getTimeUnit(),
- defaultThreadPoolProfile.getMaxQueueSize(), false);
+ defaultThreadPoolProfile.getMaxQueueSize(),
defaultThreadPoolProfile.getRejectedExecutionHandler(), false);
onNewExecutorService(answer);
return answer;
}
@@ -119,9 +120,10 @@ public class DefaultExecutorServiceStrat
}
public ExecutorService newThreadPool(Object source, String name, int
corePoolSize, int maxPoolSize, long keepAliveTime,
- TimeUnit timeUnit, int maxQueueSize,
boolean daemon) {
- ExecutorService answer =
ExecutorServiceHelper.newThreadPool(threadNamePattern, name, corePoolSize,
maxPoolSize,
-
keepAliveTime, timeUnit, maxQueueSize, daemon);
+ TimeUnit timeUnit, int maxQueueSize,
RejectedExecutionHandler rejectedExecutionHandler,
+ boolean daemon) {
+ ExecutorService answer =
ExecutorServiceHelper.newThreadPool(threadNamePattern, name, corePoolSize,
maxPoolSize, keepAliveTime,
+ timeUnit,
maxQueueSize, rejectedExecutionHandler, daemon);
onNewExecutorService(answer);
return answer;
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java?rev=923703&r1=923702&r2=923703&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThreadPoolProfileSupport.java
Tue Mar 16 12:02:37 2010
@@ -16,8 +16,10 @@
*/
package org.apache.camel.impl;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.TimeUnit;
+import org.apache.camel.ThreadPoolRejectedPolicy;
import org.apache.camel.spi.ThreadPoolProfile;
/**
@@ -31,6 +33,7 @@ public class ThreadPoolProfileSupport im
private Long keepAliveTime = 60L;
private TimeUnit timeUnit = TimeUnit.SECONDS;
private Integer maxQueueSize = -1;
+ private ThreadPoolRejectedPolicy rejectedPolicy;
public Boolean isDefaultProfile() {
return defaultProfile;
@@ -79,4 +82,19 @@ public class ThreadPoolProfileSupport im
public void setMaxQueueSize(Integer maxQueueSize) {
this.maxQueueSize = maxQueueSize;
}
+
+ public ThreadPoolRejectedPolicy getRejectedPolicy() {
+ return rejectedPolicy;
+ }
+
+ public RejectedExecutionHandler getRejectedExecutionHandler() {
+ if (rejectedPolicy != null) {
+ return rejectedPolicy.asRejectedExecutionHandler();
+ }
+ return null;
+ }
+
+ public void setRejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy) {
+ this.rejectedPolicy = rejectedPolicy;
+ }
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java?rev=923703&r1=923702&r2=923703&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java
Tue Mar 16 12:02:37 2010
@@ -16,6 +16,7 @@
*/
package org.apache.camel.model;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.TimeUnit;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
@@ -23,6 +24,7 @@ import javax.xml.bind.annotation.XmlAttr
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import org.apache.camel.ThreadPoolRejectedPolicy;
import org.apache.camel.builder.xml.TimeUnitAdapter;
import org.apache.camel.spi.ThreadPoolProfile;
@@ -47,6 +49,8 @@ public class ThreadPoolProfileDefinition
private TimeUnit timeUnit = TimeUnit.SECONDS;
@XmlAttribute()
private Integer maxQueueSize = -1;
+ @XmlAttribute()
+ private ThreadPoolRejectedPolicy rejectedPolicy;
public ThreadPoolProfileDefinition() {
}
@@ -58,6 +62,7 @@ public class ThreadPoolProfileDefinition
setKeepAliveTime(threadPoolProfile.getKeepAliveTime());
setTimeUnit(threadPoolProfile.getTimeUnit());
setMaxQueueSize(threadPoolProfile.getMaxQueueSize());
+ setRejectedPolicy(threadPoolProfile.getRejectedPolicy());
}
public ThreadPoolProfileDefinition poolSize(int poolSize) {
@@ -85,6 +90,11 @@ public class ThreadPoolProfileDefinition
return this;
}
+ public ThreadPoolProfileDefinition rejectedPolicy(ThreadPoolRejectedPolicy
rejectedPolicy) {
+ setRejectedPolicy(rejectedPolicy);
+ return this;
+ }
+
public Boolean isDefaultProfile() {
return defaultProfile;
}
@@ -132,4 +142,19 @@ public class ThreadPoolProfileDefinition
public void setMaxQueueSize(Integer maxQueueSize) {
this.maxQueueSize = maxQueueSize;
}
+
+ public ThreadPoolRejectedPolicy getRejectedPolicy() {
+ return rejectedPolicy;
+ }
+
+ public RejectedExecutionHandler getRejectedExecutionHandler() {
+ if (rejectedPolicy != null) {
+ return rejectedPolicy.asRejectedExecutionHandler();
+ }
+ return null;
+ }
+
+ public void setRejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy) {
+ this.rejectedPolicy = rejectedPolicy;
+ }
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=923703&r1=923702&r2=923703&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
Tue Mar 16 12:02:37 2010
@@ -17,6 +17,7 @@
package org.apache.camel.model;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.TimeUnit;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
@@ -26,6 +27,7 @@ import javax.xml.bind.annotation.XmlTran
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
import org.apache.camel.Processor;
+import org.apache.camel.ThreadPoolRejectedPolicy;
import org.apache.camel.WaitForTaskToComplete;
import org.apache.camel.builder.xml.TimeUnitAdapter;
import org.apache.camel.processor.ThreadsProcessor;
@@ -56,8 +58,13 @@ public class ThreadsDefinition extends O
@XmlJavaTypeAdapter(TimeUnitAdapter.class)
private TimeUnit units = TimeUnit.SECONDS;
@XmlAttribute
+ private Integer maxQueueSize = -1;
+ @XmlTransient()
private String threadName;
@XmlAttribute
+ private ThreadPoolRejectedPolicy rejectedPolicy;
+
+ @XmlAttribute
private WaitForTaskToComplete waitForTaskToComplete =
WaitForTaskToComplete.IfReplyExpected;
@Override
@@ -73,8 +80,13 @@ public class ThreadsDefinition extends O
} else {
// use a custom pool based on the settings
int max = getMaxPoolSize() != null ? getMaxPoolSize() :
poolSize;
+ RejectedExecutionHandler rejected = null;
+ if (rejectedPolicy != null) {
+ rejected = rejectedPolicy.asRejectedExecutionHandler();
+ }
executorService =
routeContext.getCamelContext().getExecutorServiceStrategy()
- .newThreadPool(this, name, poolSize,
max, getKeepAliveTime(), getUnits(), -1, true);
+ .newThreadPool(this, name, poolSize,
max, getKeepAliveTime(), getUnits(),
+ getMaxQueueSize(),
rejected, true);
}
}
Processor childProcessor = routeContext.createProcessor(this);
@@ -158,6 +170,30 @@ public class ThreadsDefinition extends O
}
/**
+ * Sets the maximum number of tasks in the work queue.
+ * <p/>
+ * Use <tt>-1</tt> or <tt>Integer.MAX_VALUE</tt> for an unbounded queue
+ *
+ * @param maxQueueSize the max queue size
+ * @return the builder
+ */
+ public ThreadsDefinition maxQueueSize(int maxQueueSize) {
+ setMaxQueueSize(maxQueueSize);
+ return this;
+ }
+
+ /**
+ * Sets the handler for tasks which cannot be executed by the thread pool.
+ *
+ * @param rejectedPolicy the policy for the handler
+ * @return the builder
+ */
+ public ThreadsDefinition rejectedPolicy(ThreadPoolRejectedPolicy
rejectedPolicy) {
+ setRejectedPolicy(rejectedPolicy);
+ return this;
+ }
+
+ /**
* Sets the thread name to use.
*
* @param threadName the thread name
@@ -237,6 +273,14 @@ public class ThreadsDefinition extends O
this.units = units;
}
+ public Integer getMaxQueueSize() {
+ return maxQueueSize;
+ }
+
+ public void setMaxQueueSize(Integer maxQueueSize) {
+ this.maxQueueSize = maxQueueSize;
+ }
+
public String getThreadName() {
return threadName;
}
@@ -244,4 +288,12 @@ public class ThreadsDefinition extends O
public void setThreadName(String threadName) {
this.threadName = threadName;
}
+
+ public ThreadPoolRejectedPolicy getRejectedPolicy() {
+ return rejectedPolicy;
+ }
+
+ public void setRejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy) {
+ this.rejectedPolicy = rejectedPolicy;
+ }
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java?rev=923703&r1=923702&r2=923703&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
Tue Mar 16 12:02:37 2010
@@ -18,6 +18,7 @@ package org.apache.camel.spi;
import java.util.List;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -143,6 +144,7 @@ public interface ExecutorServiceStrategy
* Creates a new custom thread pool.
* <p/>
* Will by default use 60 seconds for keep alive time for idle threads.
+ * And use {...@link
java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy CallerRunsPolicy} as
rejection handler
*
* @param source the source object, usually it should be
<tt>this</tt> passed in as parameter
* @param name name which is appended to the thread name
@@ -155,18 +157,21 @@ public interface ExecutorServiceStrategy
/**
* Creates a new custom thread pool.
*
- * @param source the source object, usually it should be
<tt>this</tt> passed in as parameter
- * @param name name which is appended to the thread name
- * @param corePoolSize the core pool size
- * @param maxPoolSize the maximum pool size
- * @param keepAliveTime keep alive time for idle threads
- * @param timeUnit time unit for keep alive time
- * @param maxQueueSize the maximum number of tasks in the queue, use
<tt>Integer.MAX_INT</tt> or <tt>-1</tt> to indicate unbounded
- * @param daemon whether or not the created threads is daemon or not
+ * @param source the source object, usually it should
be <tt>this</tt> passed in as parameter
+ * @param name name which is appended to the thread
name
+ * @param corePoolSize the core pool size
+ * @param maxPoolSize the maximum pool size
+ * @param keepAliveTime keep alive time for idle threads
+ * @param timeUnit time unit for keep alive time
+ * @param maxQueueSize the maximum number of tasks in the
queue, use <tt>Integer.MAX_INT</tt> or <tt>-1</tt> to indicate unbounded
+ * @param rejectedExecutionHandler the handler for tasks which cannot be
executed by the thread pool.
+ * If <tt>null</tt> is provided then
{...@link java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
CallerRunsPolicy} is used.
+ * @param daemon whether or not the created threads is
daemon or not
* @return the created thread pool
*/
ExecutorService newThreadPool(Object source, final String name, int
corePoolSize, int maxPoolSize,
- long keepAliveTime, TimeUnit timeUnit, int
maxQueueSize, boolean daemon);
+ long keepAliveTime, TimeUnit timeUnit, int
maxQueueSize,
+ RejectedExecutionHandler
rejectedExecutionHandler, boolean daemon);
/**
* Shutdown the given executor service.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java?rev=923703&r1=923702&r2=923703&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ThreadPoolProfile.java
Tue Mar 16 12:02:37 2010
@@ -16,8 +16,11 @@
*/
package org.apache.camel.spi;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.TimeUnit;
+import org.apache.camel.ThreadPoolRejectedPolicy;
+
/**
* A profile which defines thread pool settings.
*
@@ -112,4 +115,28 @@ public interface ThreadPoolProfile {
* @param maxQueueSize the max queue size
*/
void setMaxQueueSize(Integer maxQueueSize);
+
+ /**
+ * Gets the handler for tasks which cannot be executed by the thread pool.
+ *
+ * @return the policy for the handler
+ */
+ ThreadPoolRejectedPolicy getRejectedPolicy();
+
+ /**
+ * Gets the handler for tasks which cannot be executed by the thread pool.
+ *
+ * @return the handler, or <tt>null</tt> if none defined
+ */
+ RejectedExecutionHandler getRejectedExecutionHandler();
+
+ /**
+ * Sets the handler for tasks which cannot be executed by the thread pool.
+ *
+ * @param rejectedPolicy the policy for the handler
+ */
+ void setRejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy);
+
+
+
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=923703&r1=923702&r2=923703&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
Tue Mar 16 12:02:37 2010
@@ -20,6 +20,7 @@ import java.util.concurrent.BlockingQueu
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@@ -59,7 +60,7 @@ public final class ExecutorServiceHelper
* Creates a new thread name with the given prefix
*
* @param pattern the pattern
- * @param name the name
+ * @param name the name
* @return the thread name, which is unique
*/
public static String getThreadName(String pattern, String name) {
@@ -67,7 +68,7 @@ public final class ExecutorServiceHelper
pattern = DEFAULT_PATTERN;
}
- String answer = pattern.replaceFirst("\\$\\{counter\\}", "" +
nextThreadCounter());
+ String answer = pattern.replaceFirst("\\$\\{counter\\}", "" +
nextThreadCounter());
answer = answer.replaceFirst("\\$\\{name\\}", name);
if (answer.indexOf("$") > -1 || answer.indexOf("${") > -1 ||
answer.indexOf("}") > -1) {
throw new IllegalArgumentException("Pattern is invalid: " +
pattern);
@@ -117,9 +118,9 @@ public final class ExecutorServiceHelper
/**
* Creates a new single thread pool (usually for background tasks)
*
- * @param pattern pattern of the thread name
- * @param name ${name} in the pattern name
- * @param daemon whether the threads is daemon or not
+ * @param pattern pattern of the thread name
+ * @param name ${name} in the pattern name
+ * @param daemon whether the threads is daemon or not
* @return the created pool
*/
public static ExecutorService newSingleThreadExecutor(final String
pattern, final String name, final boolean daemon) {
@@ -135,9 +136,9 @@ public final class ExecutorServiceHelper
/**
* Creates a new cached thread pool
*
- * @param pattern pattern of the thread name
- * @param name ${name} in the pattern name
- * @param daemon whether the threads is daemon or not
+ * @param pattern pattern of the thread name
+ * @param name ${name} in the pattern name
+ * @param daemon whether the threads is daemon or not
* @return the created pool
*/
public static ExecutorService newCachedThreadPool(final String pattern,
final String name, final boolean daemon) {
@@ -153,32 +154,36 @@ public final class ExecutorServiceHelper
/**
* Creates a new custom thread pool using 60 seconds as keep alive and
with an unbounded queue.
*
- * @param pattern pattern of the thread name
- * @param name ${name} in the pattern name
- * @param corePoolSize the core size
- * @param maxPoolSize the maximum pool size
+ * @param pattern pattern of the thread name
+ * @param name ${name} in the pattern name
+ * @param corePoolSize the core size
+ * @param maxPoolSize the maximum pool size
* @return the created pool
*/
public static ExecutorService newThreadPool(final String pattern, final
String name, int corePoolSize, int maxPoolSize) {
- return ExecutorServiceHelper.newThreadPool(pattern, name,
corePoolSize, maxPoolSize, 60, TimeUnit.SECONDS, -1, true);
+ return ExecutorServiceHelper.newThreadPool(pattern, name,
corePoolSize, maxPoolSize, 60,
+ TimeUnit.SECONDS, -1, new
ThreadPoolExecutor.CallerRunsPolicy(), true);
}
/**
* Creates a new custom thread pool
*
- * @param pattern pattern of the thread name
- * @param name ${name} in the pattern name
- * @param corePoolSize the core size
- * @param maxPoolSize the maximum pool size
- * @param keepAliveTime keep alive
- * @param timeUnit keep alive time unit
- * @param maxQueueSize the maximum number of tasks in the queue, use
<tt>Integer.MAX_VALUE</tt> or <tt>-1</tt> to indicate unbounded
- * @param daemon whether the threads is daemon or not
+ * @param pattern pattern of the thread name
+ * @param name ${name} in the pattern name
+ * @param corePoolSize the core size
+ * @param maxPoolSize the maximum pool size
+ * @param keepAliveTime keep alive time
+ * @param timeUnit keep alive time unit
+ * @param maxQueueSize the maximum number of tasks in the
queue, use <tt>Integer.MAX_VALUE</tt> or <tt>-1</tt> to indicate unbounded
+ * @param rejectedExecutionHandler the handler for tasks which cannot be
executed by the thread pool.
+ * If <tt>null</tt> is provided then
{...@link java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
CallerRunsPolicy} is used.
+ * @param daemon whether the threads is daemon or not
* @return the created pool
* @throws IllegalArgumentException if parameters is not valid
*/
public static ExecutorService newThreadPool(final String pattern, final
String name, int corePoolSize, int maxPoolSize,
- long keepAliveTime, TimeUnit
timeUnit, int maxQueueSize, final boolean daemon) {
+ long keepAliveTime, TimeUnit
timeUnit, int maxQueueSize,
+ RejectedExecutionHandler
rejectedExecutionHandler, final boolean daemon) {
// validate max >= core
if (maxPoolSize < corePoolSize) {
@@ -203,6 +208,10 @@ public final class ExecutorServiceHelper
return answer;
}
});
+ if (rejectedExecutionHandler == null) {
+ rejectedExecutionHandler = new
ThreadPoolExecutor.CallerRunsPolicy();
+ }
+ answer.setRejectedExecutionHandler(rejectedExecutionHandler);
return answer;
}
@@ -211,14 +220,14 @@ public final class ExecutorServiceHelper
* <p/>
* This method will lookup for configured thread pool in the following
order
* <ul>
- * <li>from the definition if any explicit configured executor
service.</li>
- * <li>if none found, then <tt>null</tt> is returned.</li>
+ * <li>from the definition if any explicit configured executor
service.</li>
+ * <li>if none found, then <tt>null</tt> is returned.</li>
* </ul>
* The various {...@link ExecutorServiceAwareDefinition} should use this
helper method to ensure they support
* configured executor services in the same coherent way.
*
- * @param routeContext the rout context
- * @param definition the node definition which may leverage executor
service.
+ * @param routeContext the rout context
+ * @param definition the node definition which may leverage executor
service.
* @return the configured executor service, or <tt>null</tt> if none was
configured.
* @throws IllegalArgumentException is thrown if lookup of executor
service in {...@link org.apache.camel.spi.Registry} was not found
*/
Modified: camel/trunk/camel-core/src/main/resources/org/apache/camel/jaxb.index
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/resources/org/apache/camel/jaxb.index?rev=923703&r1=923702&r2=923703&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/resources/org/apache/camel/jaxb.index
(original)
+++ camel/trunk/camel-core/src/main/resources/org/apache/camel/jaxb.index Tue
Mar 16 12:02:37 2010
@@ -17,5 +17,6 @@
ExchangePattern
LoggingLevel
ManagementStatisticsLevel
+ThreadPoolRejectedPolicy
ShutdownRoute
ShutdownRunningTask
\ No newline at end of file
Modified:
camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java?rev=923703&r1=923702&r2=923703&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ThreadPoolBuilderTest.java
Tue Mar 16 12:02:37 2010
@@ -21,6 +21,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ThreadPoolRejectedPolicy;
import org.apache.camel.impl.JndiRegistry;
/**
@@ -110,7 +111,9 @@ public class ThreadPoolBuilderTest exten
public void testThreadPoolBuilderAll() throws Exception {
ThreadPoolBuilder builder = new ThreadPoolBuilder(context);
ExecutorService executor =
builder.poolSize(50).maxPoolSize(100).maxQueueSize(2000)
-
.keepAliveTime(20000).timeUnit(TimeUnit.MILLISECONDS).build(this, "myPool");
+ .keepAliveTime(20000).timeUnit(TimeUnit.MILLISECONDS)
+ .rejectedPolicy(ThreadPoolRejectedPolicy.DiscardOldest)
+ .build(this, "myPool");
assertNotNull(executor);
assertEquals(false, executor.isShutdown());
Copied:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsMaxQueueSizeTest.java
(from r923639,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsCoreAndMaxPoolTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsMaxQueueSizeTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsMaxQueueSizeTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsCoreAndMaxPoolTest.java&r1=923639&r2=923703&rev=923703&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsCoreAndMaxPoolTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsMaxQueueSizeTest.java
Tue Mar 16 12:02:37 2010
@@ -22,9 +22,9 @@ import org.apache.camel.builder.RouteBui
/**
* @version $Revision$
*/
-public class ThreadsCoreAndMaxPoolTest extends ContextTestSupport {
+public class ThreadsMaxQueueSizeTest extends ContextTestSupport {
- public void testThreadsCoreAndMaxPool() throws Exception {
+ public void testThreadsMaxQueueSize() throws Exception {
getMockEndpoint("mock:result").expectedMessageCount(1);
template.sendBody("direct:start", "Hello World");
@@ -32,7 +32,7 @@ public class ThreadsCoreAndMaxPoolTest e
assertMockEndpointsSatisfied();
}
- public void testThreadsCoreAndMaxPoolBuilder() throws Exception {
+ public void testThreadsMaxQueueSizeBuilder() throws Exception {
getMockEndpoint("mock:result").expectedMessageCount(1);
template.sendBody("direct:foo", "Hello World");
@@ -47,12 +47,13 @@ public class ThreadsCoreAndMaxPoolTest e
public void configure() throws Exception {
from("direct:start")
// will use a a custom thread pool with 5 in core and 10
as max
- .threads(5, 10)
+ // and a max task queue with 2000
+ .threads(5, 10).maxQueueSize(2000)
.to("mock:result");
from("direct:foo")
// using the builder style
- .threads().poolSize(5).maxPoolSize(10).threadName("myPool")
+
.threads().poolSize(5).maxPoolSize(10).maxQueueSize(2000).threadName("myPool")
.to("mock:result");
}
};
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsMaxQueueSizeTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsMaxQueueSizeTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedPolicyTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedPolicyTest.java?rev=923703&view=auto
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedPolicyTest.java
(added)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedPolicyTest.java
Tue Mar 16 12:02:37 2010
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+import static org.apache.camel.ThreadPoolRejectedPolicy.DiscardOldest;
+
+/**
+ * @version $Revision$
+ */
+public class ThreadsRejectedPolicyTest extends ContextTestSupport {
+
+ public void testThreadsRejectedPolicy() throws Exception {
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testThreadsRejectedPolicyBuilder() throws Exception {
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+
+ template.sendBody("direct:foo", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .threads(5, 10).rejectedPolicy(DiscardOldest)
+ .to("mock:result");
+
+ from("direct:foo")
+ // using the builder style
+
.threads().poolSize(5).maxPoolSize(10).rejectedPolicy(DiscardOldest).threadName("myPool")
+ .to("mock:result");
+ }
+ };
+ }
+}
\ No newline at end of file
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedPolicyTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedPolicyTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java?rev=923703&r1=923702&r2=923703&view=diff
==============================================================================
---
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
(original)
+++
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
Tue Mar 16 12:02:37 2010
@@ -139,7 +139,7 @@ public class CamelContextFactoryBean ext
@XmlElement(name = "routeBuilder", required = false)
private List<RouteBuilderDefinition> builderRefs = new
ArrayList<RouteBuilderDefinition>();
@XmlElement(name = "threadPool", required = false)
- private List<CamelExecutorServiceFactoryBean> threadPools;
+ private List<CamelThreadPoolFactoryBean> threadPools;
@XmlElement(name = "endpoint", required = false)
private List<CamelEndpointFactoryBean> endpoints;
@XmlElement(name = "dataFormats", required = false)
Copied:
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelThreadPoolFactoryBean.java
(from r923645,
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelExecutorServiceFactoryBean.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelThreadPoolFactoryBean.java?p2=camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelThreadPoolFactoryBean.java&p1=camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelExecutorServiceFactoryBean.java&r1=923645&r2=923703&rev=923703&view=diff
==============================================================================
---
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelExecutorServiceFactoryBean.java
(original)
+++
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelThreadPoolFactoryBean.java
Tue Mar 16 12:02:37 2010
@@ -17,6 +17,7 @@
package org.apache.camel.spring;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.TimeUnit;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
@@ -27,6 +28,7 @@ import javax.xml.bind.annotation.adapter
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
+import org.apache.camel.ThreadPoolRejectedPolicy;
import org.apache.camel.builder.xml.TimeUnitAdapter;
import org.apache.camel.model.IdentifiedType;
import org.apache.camel.spring.util.CamelContextResolverHelper;
@@ -43,7 +45,7 @@ import static org.apache.camel.util.Obje
*/
@XmlRootElement(name = "threadPool")
@XmlAccessorType(XmlAccessType.FIELD)
-public class CamelExecutorServiceFactoryBean extends IdentifiedType implements
FactoryBean, CamelContextAware, ApplicationContextAware {
+public class CamelThreadPoolFactoryBean extends IdentifiedType implements
FactoryBean, CamelContextAware, ApplicationContextAware {
@XmlAttribute
private Integer poolSize;
@@ -57,6 +59,8 @@ public class CamelExecutorServiceFactory
@XmlAttribute
private Integer maxQueueSize = -1;
@XmlAttribute
+ private ThreadPoolRejectedPolicy rejectedPolicy;
+ @XmlAttribute
private String threadName;
@XmlAttribute
private Boolean daemon = Boolean.TRUE;
@@ -82,8 +86,12 @@ public class CamelExecutorServiceFactory
} else {
// use a custom pool based on the settings
int max = getMaxPoolSize() != null ? getMaxPoolSize() :
getPoolSize();
- answer = camelContext.getExecutorServiceStrategy()
- .newThreadPool(getId(), name, getPoolSize(), max,
getKeepAliveTime(), getUnits(), getMaxQueueSize(), isDaemon());
+ RejectedExecutionHandler rejected = null;
+ if (rejectedPolicy != null) {
+ rejected = rejectedPolicy.asRejectedExecutionHandler();
+ }
+ answer =
camelContext.getExecutorServiceStrategy().newThreadPool(getId(), name,
getPoolSize(), max,
+ getKeepAliveTime(), getUnits(), getMaxQueueSize(),
rejected, isDaemon());
}
return answer;
}
@@ -136,6 +144,14 @@ public class CamelExecutorServiceFactory
this.maxQueueSize = maxQueueSize;
}
+ public ThreadPoolRejectedPolicy getRejectedPolicy() {
+ return rejectedPolicy;
+ }
+
+ public void setRejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy) {
+ this.rejectedPolicy = rejectedPolicy;
+ }
+
public String getThreadName() {
return threadName;
}
Propchange:
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelThreadPoolFactoryBean.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelThreadPoolFactoryBean.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/handler/CamelNamespaceHandler.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/handler/CamelNamespaceHandler.java?rev=923703&r1=923702&r2=923703&view=diff
==============================================================================
---
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/handler/CamelNamespaceHandler.java
(original)
+++
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/handler/CamelNamespaceHandler.java
Tue Mar 16 12:02:37 2010
@@ -39,7 +39,7 @@ import org.apache.camel.spring.CamelBean
import org.apache.camel.spring.CamelConsumerTemplateFactoryBean;
import org.apache.camel.spring.CamelContextFactoryBean;
import org.apache.camel.spring.CamelEndpointFactoryBean;
-import org.apache.camel.spring.CamelExecutorServiceFactoryBean;
+import org.apache.camel.spring.CamelThreadPoolFactoryBean;
import org.apache.camel.spring.CamelJMXAgentDefinition;
import org.apache.camel.spring.CamelProducerTemplateFactoryBean;
import org.apache.camel.spring.CamelPropertyPlaceholderDefinition;
@@ -94,7 +94,7 @@ public class CamelNamespaceHandler exten
addBeanDefinitionParser("consumerTemplate",
CamelConsumerTemplateFactoryBean.class, true);
addBeanDefinitionParser("export", CamelServiceExporter.class, true);
addBeanDefinitionParser("endpoint", CamelEndpointFactoryBean.class,
true);
- addBeanDefinitionParser("threadPool",
CamelExecutorServiceFactoryBean.class, true);
+ addBeanDefinitionParser("threadPool",
CamelThreadPoolFactoryBean.class, true);
// jmx agent and property placeholder cannot be used outside of the
camel context
addBeanDefinitionParser("jmxAgent", CamelJMXAgentDefinition.class,
false);
Modified:
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThreadsExternalThreadPoolFactoryBeanTest.xml
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThreadsExternalThreadPoolFactoryBeanTest.xml?rev=923703&r1=923702&r2=923703&view=diff
==============================================================================
---
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThreadsExternalThreadPoolFactoryBeanTest.xml
(original)
+++
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThreadsExternalThreadPoolFactoryBeanTest.xml
Tue Mar 16 12:02:37 2010
@@ -30,6 +30,7 @@
maxQueueSize="2000"
threadName="myPool"
keepAliveTime="30"
+ rejectedPolicy="DiscardOldest"
units="SECONDS"
daemon="true"
xmlns="http://camel.apache.org/schema/spring"/>