Author: chirino
Date: Fri Feb 26 18:40:31 2010
New Revision: 916780
URL: http://svn.apache.org/viewvc?rev=916780&view=rev
Log:
Updated the TransportConnector and TransportConnection to use a thread pool
when initalizing and destroying connection to better support fast
connect and disconnect use cases.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=916780&r1=916779&r2=916780&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Fri Feb 26 18:40:31 2010
@@ -77,6 +77,7 @@
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.state.SessionState;
import org.apache.activemq.state.TransactionState;
+import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
@@ -91,6 +92,8 @@
import org.apache.activemq.util.URISupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
+import static org.apache.activemq.thread.DefaultThreadPools.*;
/**
* @version $Revision: 1.8 $
*/
@@ -908,8 +911,7 @@
cs.getContext().getStopping().set(true);
}
try {
- new Thread("ActiveMQ Transport Stopper: " +
transport.getRemoteAddress()) {
- @Override
+ getDefaultTaskRunnerFactory().execute(new Runnable(){
public void run() {
serviceLock.writeLock().lock();
try {
@@ -922,7 +924,7 @@
serviceLock.writeLock().unlock();
}
}
- }.start();
+ });
} catch (Throwable t) {
LOG.warn("cannot create async transport stopper thread.. not
waiting for stop to complete, reason:", t);
stopped.countDown();
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=916780&r1=916779&r2=916780&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
Fri Feb 26 18:40:31 2010
@@ -21,6 +21,7 @@
import org.apache.activemq.broker.region.ConnectorStatistics;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.security.MessageAuthorizationPolicy;
+import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
@@ -32,6 +33,9 @@
import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
+import static org.apache.activemq.thread.DefaultThreadPools.*;
+
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@@ -202,9 +206,7 @@
server.setAcceptListener(new TransportAcceptListener() {
public void onAccept(final Transport transport) {
try {
- // Starting the connection could block due to
- // wireformat negotiation, so start it in an async thread.
- Thread startThread = new Thread("ActiveMQ Transport
Initiator: " + transport.getRemoteAddress()) {
+ getDefaultTaskRunnerFactory().execute(new Runnable(){
public void run() {
try {
Connection connection =
createConnection(transport);
@@ -214,8 +216,7 @@
onAcceptError(e);
}
}
- };
- startThread.start();
+ });
} catch (Exception e) {
String remoteHost = transport.getRemoteAddress();
ServiceSupport.dispose(transport);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java?rev=916780&r1=916779&r2=916780&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java
Fri Feb 26 18:40:31 2010
@@ -26,24 +26,24 @@
*/
public final class DefaultThreadPools {
- private static final Executor DEFAULT_POOL;
- static {
- DEFAULT_POOL = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
- public Thread newThread(Runnable runnable) {
- Thread thread = new Thread(runnable, "ActiveMQ Default Thread
Pool Thread");
- thread.setDaemon(true);
- return thread;
- }
- });
- }
+// private static final Executor DEFAULT_POOL;
+// static {
+// DEFAULT_POOL = new ScheduledThreadPoolExecutor(5, new
ThreadFactory() {
+// public Thread newThread(Runnable runnable) {
+// Thread thread = new Thread(runnable, "ActiveMQ Default
Thread Pool Thread");
+// thread.setDaemon(true);
+// return thread;
+// }
+// });
+// }
private static final TaskRunnerFactory DEFAULT_TASK_RUNNER_FACTORY = new
TaskRunnerFactory();
private DefaultThreadPools() {
}
- public static Executor getDefaultPool() {
- return DEFAULT_POOL;
- }
+// public static Executor getDefaultPool() {
+// return DEFAULT_POOL;
+// }
public static TaskRunnerFactory getDefaultTaskRunnerFactory() {
return DEFAULT_TASK_RUNNER_FACTORY;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java?rev=916780&r1=916779&r2=916780&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
Fri Feb 26 18:40:31 2010
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.thread;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
@@ -31,7 +32,7 @@
*
* @version $Revision: 1.5 $
*/
-public class TaskRunnerFactory {
+public class TaskRunnerFactory implements Executor {
private ExecutorService executor;
private int maxIterationsPerRun;
@@ -80,6 +81,18 @@
}
}
+ public void execute(Runnable runnable) {
+ execute(runnable, "ActiveMQ Task");
+ }
+
+ public void execute(Runnable runnable, String name) {
+ if (executor != null) {
+ executor.execute(runnable);
+ } else {
+ new Thread(runnable, name).start();
+ }
+ }
+
protected ExecutorService createDefaultExecutor() {
ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java?rev=916780&r1=916779&r2=916780&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
Fri Feb 26 18:40:31 2010
@@ -20,8 +20,11 @@
import java.nio.channels.SocketChannel;
import java.util.LinkedList;
import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
/**
* The SelectorManager will manage one Selector and the thread that checks the
@@ -36,17 +39,21 @@
public static final SelectorManager SINGLETON = new SelectorManager();
- private Executor selectorExecutor = Executors.newCachedThreadPool(new
ThreadFactory() {
- public Thread newThread(Runnable r) {
- Thread rc = new Thread(r);
- rc.setName("NIO Transport Thread");
- return rc;
- }
- });
+ private Executor selectorExecutor = createDefaultExecutor();
private Executor channelExecutor = selectorExecutor;
private LinkedList<SelectorWorker> freeWorkers = new
LinkedList<SelectorWorker>();
private int maxChannelsPerWorker = 64;
+ protected ExecutorService createDefaultExecutor() {
+ ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
+ public Thread newThread(Runnable runnable) {
+ return new Thread(runnable, "ActiveMQ NIO Worker");
+ }
+ });
+ // rc.allowCoreThreadTimeOut(true);
+ return rc;
+ }
+
public static SelectorManager getInstance() {
return SINGLETON;
}