Author: dblevins
Date: Mon Aug 20 19:28:06 2012
New Revision: 1375177

URL: http://svn.apache.org/viewvc?rev=1375177&view=rev
Log:
AsynchronousPool.QueueType allows for configuration of the BlockingQueue 
implementation
OPENEJB-1895 - Refactored @Asynchronous support
TOMEE-382 - configuration for asynch task pool

Modified:
    
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/async/AsynchronousPool.java

Modified: 
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/async/AsynchronousPool.java
URL: 
http://svn.apache.org/viewvc/openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/async/AsynchronousPool.java?rev=1375177&r1=1375176&r2=1375177&view=diff
==============================================================================
--- 
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/async/AsynchronousPool.java
 (original)
+++ 
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/async/AsynchronousPool.java
 Mon Aug 20 19:28:06 2012
@@ -25,14 +25,19 @@ import org.apache.openejb.util.Duration;
 import javax.ejb.EJBException;
 import javax.ejb.NoSuchEJBException;
 import java.rmi.NoSuchObjectException;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.DelayQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -46,9 +51,10 @@ public class AsynchronousPool {
     private final BlockingQueue<Runnable> blockingQueue;
     private final ExecutorService executor;
 
-    public AsynchronousPool(String id, int corePoolSize, int maximumPoolSize, 
Duration keepAliveTime) {
-        this.blockingQueue = new LinkedBlockingQueue<Runnable>();
+    public AsynchronousPool(String id, int corePoolSize, int maximumPoolSize, 
Duration keepAliveTime, BlockingQueue<Runnable> blockingQueue) {
+        blockingQueue = new LinkedBlockingQueue<Runnable>();
         final TimeUnit unit = (keepAliveTime.getUnit() != null) ? 
keepAliveTime.getUnit() : TimeUnit.SECONDS;
+        this.blockingQueue = blockingQueue;
         this.executor = new ThreadPoolExecutor(
                 corePoolSize,
                 maximumPoolSize,
@@ -63,8 +69,43 @@ public class AsynchronousPool {
         final int corePoolSize = options.get("AsynchronousPool.CorePoolSize", 
10);
         final int maximumPoolSize = 
Math.max(options.get("AsynchronousPool.MaximumPoolSize", 20), corePoolSize);
         final Duration keepAliveTime = 
options.get("AsynchronousPool.KeepAliveTime", new Duration(60, 
TimeUnit.SECONDS));
+        final BlockingQueue queue = options.get("AsynchronousPool.QueueType", 
QueueType.LINKED).create(options);
 
-        return new AsynchronousPool(id, corePoolSize, maximumPoolSize, 
keepAliveTime);
+        return new AsynchronousPool(id, corePoolSize, maximumPoolSize, 
keepAliveTime, queue);
+    }
+
+    private static enum QueueType {
+        ARRAY,
+        DELAY,
+        LINKED,
+        PRIORITY,
+        SYNCHRONOUS;
+
+        public BlockingQueue create(Options options) {
+            switch (this) {
+                case ARRAY: {
+                    return new 
ArrayBlockingQueue(options.get("AsynchronousPool.QueueSize", 100));
+                }
+                case DELAY: {
+                    return new DelayQueue();
+                }
+                case LINKED: {
+                    return new 
LinkedBlockingQueue(options.get("AsynchronousPool.QueueSize", 
Integer.MAX_VALUE));
+                }
+                case PRIORITY: {
+                    return new PriorityBlockingQueue();
+                }
+                case SYNCHRONOUS: {
+                    return new 
SynchronousQueue(options.get("AsynchronousPool.QueueFair", false));
+                }
+                default: {
+                    // The Options class will throw an error if the user 
supplies an unknown enum string
+                    // The only way we can reach this is if we add a new 
QueueType element and forget to
+                    // implement it in the above switch statement.
+                    throw new IllegalArgumentException("Unknown QueueType 
type: " + this);
+                }
+            }
+        }
     }
 
     public Object invoke(Callable<Object> callable, boolean isVoid) throws 
Throwable {


Reply via email to