Author: andygumbrecht
Date: Thu Jan 17 15:09:02 2013
New Revision: 1434706

URL: http://svn.apache.org/viewvc?rev=1434706&view=rev
Log:
Rejection handlers.

Modified:
    
tomee/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
    
tomee/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/ExecutorBuilder.java
    
tomee/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java

Modified: 
tomee/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
URL: 
http://svn.apache.org/viewvc/tomee/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java?rev=1434706&r1=1434705&r2=1434706&view=diff
==============================================================================
--- 
tomee/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
 (original)
+++ 
tomee/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
 Thu Jan 17 15:09:02 2013
@@ -77,7 +77,7 @@ public class StatelessInstanceManager {
     protected final SafeToolkit toolkit = 
SafeToolkit.getToolkit("StatefulInstanceManager");
     private SecurityService securityService;
     private final Pool.Builder poolBuilder;
-    private final Executor executor;
+    private final ThreadPoolExecutor executor;
 
     public StatelessInstanceManager(final SecurityService securityService, 
final Duration accessTimeout, final Duration closeTimeout, final Pool.Builder 
poolBuilder, final int callbackThreads) {
         this.securityService = securityService;
@@ -105,7 +105,23 @@ public class StatelessInstanceManager {
             }
         });
 
+        executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
+            @Override
+            public void rejectedExecution(final Runnable r, final 
ThreadPoolExecutor tpe) {
+
+                if (null == r || null == tpe || tpe.isShutdown() || 
tpe.isTerminated() || tpe.isTerminating()) {
+                    return;
+                }
 
+                try {
+                    if (!tpe.getQueue().offer(r, 20, TimeUnit.SECONDS)) {
+                        logger.warning("Executor failed to run asynchronous 
process: " + r);
+                    }
+                } catch (InterruptedException e) {
+                    //Ignore
+                }
+            }
+        });
     }
 
     private class StatelessSupplier implements Pool.Supplier<Instance> {

Modified: 
tomee/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/ExecutorBuilder.java
URL: 
http://svn.apache.org/viewvc/tomee/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/ExecutorBuilder.java?rev=1434706&r1=1434705&r2=1434706&view=diff
==============================================================================
--- 
tomee/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/ExecutorBuilder.java
 (original)
+++ 
tomee/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/ExecutorBuilder.java
 Thu Jan 17 15:09:02 2013
@@ -121,10 +121,10 @@ public class ExecutorBuilder {
         public BlockingQueue create(final Options options, final String 
prefix, final int queueSize) {
             switch (this) {
                 case ARRAY: {
-                    return new ArrayBlockingQueue(queueSize);
+                    return new ArrayBlockingQueue(queueSize > 0 ? queueSize : 
1);
                 }
                 case LINKED: {
-                    return new LinkedBlockingQueue(queueSize);
+                    return new LinkedBlockingQueue(queueSize > 0 ? queueSize : 
1);
                 }
                 case PRIORITY: {
                     return new PriorityBlockingQueue();

Modified: 
tomee/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java
URL: 
http://svn.apache.org/viewvc/tomee/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java?rev=1434706&r1=1434705&r2=1434706&view=diff
==============================================================================
--- 
tomee/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java
 (original)
+++ 
tomee/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java
 Thu Jan 17 15:09:02 2013
@@ -21,19 +21,8 @@ import org.apache.openejb.monitoring.Man
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.*;
+import java.util.concurrent.*;
 
 /**
  * @version $Rev$ $Date$
@@ -48,20 +37,38 @@ public class DiscoveryRegistry implement
     @Managed
     private final Monitor monitor = new Monitor();
 
-    private final Executor executor = new ThreadPoolExecutor(1, 10, 30, 
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
-        @Override
-        public Thread newThread(final Runnable runable) {
-            final Thread t = new Thread(runable, 
DiscoveryRegistry.class.getSimpleName());
-            t.setDaemon(true);
-            return t;
-        }
-    });
+    private final ThreadPoolExecutor executor;
 
     public DiscoveryRegistry() {
         this(null);
     }
 
     public DiscoveryRegistry(final DiscoveryAgent agent) {
+
+        executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>(1), new ThreadFactory() {
+            @Override
+            public Thread newThread(final Runnable runable) {
+                final Thread t = new Thread(runable, 
DiscoveryRegistry.class.getSimpleName());
+                t.setDaemon(true);
+                return t;
+            }
+        });
+
+        executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
+            @Override
+            public void rejectedExecution(final Runnable r, final 
ThreadPoolExecutor tpe) {
+                if (null == r || null == tpe || tpe.isShutdown() || 
tpe.isTerminated() || tpe.isTerminating()) {
+                    return;
+                }
+
+                try {
+                    tpe.getQueue().offer(r, 20, TimeUnit.SECONDS);
+                } catch (InterruptedException e) {
+                    //Ignore
+                }
+            }
+        });
+
         SystemInstance.get().setComponent(DiscoveryRegistry.class, this);
         SystemInstance.get().setComponent(DiscoveryAgent.class, this);
 


Reply via email to