Author: dblevins
Date: Mon Aug 20 19:28:06 2012
New Revision: 1375177
URL: http://svn.apache.org/viewvc?rev=1375177&view=rev
Log:
AsynchronousPool.QueueType allows for configuration of the BlockingQueue
implementation
OPENEJB-1895 - Refactored @Asynchronous support
TOMEE-382 - configuration for asynch task pool
Modified:
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/async/AsynchronousPool.java
Modified:
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/async/AsynchronousPool.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/async/AsynchronousPool.java?rev=1375177&r1=1375176&r2=1375177&view=diff
==============================================================================
---
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/async/AsynchronousPool.java
(original)
+++
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/async/AsynchronousPool.java
Mon Aug 20 19:28:06 2012
@@ -25,14 +25,19 @@ import org.apache.openejb.util.Duration;
import javax.ejb.EJBException;
import javax.ejb.NoSuchEJBException;
import java.rmi.NoSuchObjectException;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -46,9 +51,10 @@ public class AsynchronousPool {
private final BlockingQueue<Runnable> blockingQueue;
private final ExecutorService executor;
- public AsynchronousPool(String id, int corePoolSize, int maximumPoolSize,
Duration keepAliveTime) {
- this.blockingQueue = new LinkedBlockingQueue<Runnable>();
+ public AsynchronousPool(String id, int corePoolSize, int maximumPoolSize,
Duration keepAliveTime, BlockingQueue<Runnable> blockingQueue) {
+ blockingQueue = new LinkedBlockingQueue<Runnable>();
final TimeUnit unit = (keepAliveTime.getUnit() != null) ?
keepAliveTime.getUnit() : TimeUnit.SECONDS;
+ this.blockingQueue = blockingQueue;
this.executor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
@@ -63,8 +69,43 @@ public class AsynchronousPool {
final int corePoolSize = options.get("AsynchronousPool.CorePoolSize",
10);
final int maximumPoolSize =
Math.max(options.get("AsynchronousPool.MaximumPoolSize", 20), corePoolSize);
final Duration keepAliveTime =
options.get("AsynchronousPool.KeepAliveTime", new Duration(60,
TimeUnit.SECONDS));
+ final BlockingQueue queue = options.get("AsynchronousPool.QueueType",
QueueType.LINKED).create(options);
- return new AsynchronousPool(id, corePoolSize, maximumPoolSize,
keepAliveTime);
+ return new AsynchronousPool(id, corePoolSize, maximumPoolSize,
keepAliveTime, queue);
+ }
+
+ private static enum QueueType {
+ ARRAY,
+ DELAY,
+ LINKED,
+ PRIORITY,
+ SYNCHRONOUS;
+
+ public BlockingQueue create(Options options) {
+ switch (this) {
+ case ARRAY: {
+ return new
ArrayBlockingQueue(options.get("AsynchronousPool.QueueSize", 100));
+ }
+ case DELAY: {
+ return new DelayQueue();
+ }
+ case LINKED: {
+ return new
LinkedBlockingQueue(options.get("AsynchronousPool.QueueSize",
Integer.MAX_VALUE));
+ }
+ case PRIORITY: {
+ return new PriorityBlockingQueue();
+ }
+ case SYNCHRONOUS: {
+ return new
SynchronousQueue(options.get("AsynchronousPool.QueueFair", false));
+ }
+ default: {
+ // The Options class will throw an error if the user
supplies an unknown enum string
+ // The only way we can reach this is if we add a new
QueueType element and forget to
+ // implement it in the above switch statement.
+ throw new IllegalArgumentException("Unknown QueueType
type: " + this);
+ }
+ }
+ }
}
public Object invoke(Callable<Object> callable, boolean isVoid) throws
Throwable {