Author: andygumbrecht
Date: Thu Jan 17 15:09:02 2013
New Revision: 1434706
URL: http://svn.apache.org/viewvc?rev=1434706&view=rev
Log:
Rejection handlers.
Modified:
tomee/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
tomee/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/ExecutorBuilder.java
tomee/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java
Modified:
tomee/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
URL:
http://svn.apache.org/viewvc/tomee/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java?rev=1434706&r1=1434705&r2=1434706&view=diff
==============================================================================
---
tomee/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
(original)
+++
tomee/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
Thu Jan 17 15:09:02 2013
@@ -77,7 +77,7 @@ public class StatelessInstanceManager {
protected final SafeToolkit toolkit =
SafeToolkit.getToolkit("StatefulInstanceManager");
private SecurityService securityService;
private final Pool.Builder poolBuilder;
- private final Executor executor;
+ private final ThreadPoolExecutor executor;
public StatelessInstanceManager(final SecurityService securityService,
final Duration accessTimeout, final Duration closeTimeout, final Pool.Builder
poolBuilder, final int callbackThreads) {
this.securityService = securityService;
@@ -105,7 +105,23 @@ public class StatelessInstanceManager {
}
});
+ executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
+ @Override
+ public void rejectedExecution(final Runnable r, final
ThreadPoolExecutor tpe) {
+
+ if (null == r || null == tpe || tpe.isShutdown() ||
tpe.isTerminated() || tpe.isTerminating()) {
+ return;
+ }
+ try {
+ if (!tpe.getQueue().offer(r, 20, TimeUnit.SECONDS)) {
+ logger.warning("Executor failed to run asynchronous
process: " + r);
+ }
+ } catch (InterruptedException e) {
+ //Ignore
+ }
+ }
+ });
}
private class StatelessSupplier implements Pool.Supplier<Instance> {
Modified:
tomee/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/ExecutorBuilder.java
URL:
http://svn.apache.org/viewvc/tomee/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/ExecutorBuilder.java?rev=1434706&r1=1434705&r2=1434706&view=diff
==============================================================================
---
tomee/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/ExecutorBuilder.java
(original)
+++
tomee/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/ExecutorBuilder.java
Thu Jan 17 15:09:02 2013
@@ -121,10 +121,10 @@ public class ExecutorBuilder {
public BlockingQueue create(final Options options, final String
prefix, final int queueSize) {
switch (this) {
case ARRAY: {
- return new ArrayBlockingQueue(queueSize);
+ return new ArrayBlockingQueue(queueSize > 0 ? queueSize :
1);
}
case LINKED: {
- return new LinkedBlockingQueue(queueSize);
+ return new LinkedBlockingQueue(queueSize > 0 ? queueSize :
1);
}
case PRIORITY: {
return new PriorityBlockingQueue();
Modified:
tomee/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java
URL:
http://svn.apache.org/viewvc/tomee/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java?rev=1434706&r1=1434705&r2=1434706&view=diff
==============================================================================
---
tomee/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java
(original)
+++
tomee/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java
Thu Jan 17 15:09:02 2013
@@ -21,19 +21,8 @@ import org.apache.openejb.monitoring.Man
import java.io.IOException;
import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.*;
+import java.util.concurrent.*;
/**
* @version $Rev$ $Date$
@@ -48,20 +37,38 @@ public class DiscoveryRegistry implement
@Managed
private final Monitor monitor = new Monitor();
- private final Executor executor = new ThreadPoolExecutor(1, 10, 30,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
- @Override
- public Thread newThread(final Runnable runable) {
- final Thread t = new Thread(runable,
DiscoveryRegistry.class.getSimpleName());
- t.setDaemon(true);
- return t;
- }
- });
+ private final ThreadPoolExecutor executor;
public DiscoveryRegistry() {
this(null);
}
public DiscoveryRegistry(final DiscoveryAgent agent) {
+
+ executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(1), new ThreadFactory() {
+ @Override
+ public Thread newThread(final Runnable runable) {
+ final Thread t = new Thread(runable,
DiscoveryRegistry.class.getSimpleName());
+ t.setDaemon(true);
+ return t;
+ }
+ });
+
+ executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
+ @Override
+ public void rejectedExecution(final Runnable r, final
ThreadPoolExecutor tpe) {
+ if (null == r || null == tpe || tpe.isShutdown() ||
tpe.isTerminated() || tpe.isTerminating()) {
+ return;
+ }
+
+ try {
+ tpe.getQueue().offer(r, 20, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ //Ignore
+ }
+ }
+ });
+
SystemInstance.get().setComponent(DiscoveryRegistry.class, this);
SystemInstance.get().setComponent(DiscoveryAgent.class, this);