Author: gtully
Date: Tue Dec 14 17:28:10 2010
New Revision: 1049184
URL: http://svn.apache.org/viewvc?rev=1049184&view=rev
Log:
resolve: https://issues.apache.org/jira/browse/AMQ-2852 - have async close use
default thread pool and have inactivity monitor include the task runner in its
resource usage such that it can be shutdown. provide accessor to shutdown the
default thread pool such that a webapp can cleanup if needed,
org.apache.activemq.thread.DefaultThreadPools#shutdown - additional tests to
cover same
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java?rev=1049184&r1=1049183&r2=1049184&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java
Tue Dec 14 17:28:10 2010
@@ -48,5 +48,13 @@ public final class DefaultThreadPools {
public static TaskRunnerFactory getDefaultTaskRunnerFactory() {
return DEFAULT_TASK_RUNNER_FACTORY;
}
+
+ /**
+ * Useful to cleanup when it is known that all brokers and connections are
+ * close and stopped, eg: when un deploying from web container.
+ */
+ public static void shutdown() {
+ DEFAULT_TASK_RUNNER_FACTORY.shutdown();
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?rev=1049184&r1=1049183&r2=1049184&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
Tue Dec 14 17:28:10 2010
@@ -41,8 +41,7 @@ import org.apache.commons.logging.LogFac
public class InactivityMonitor extends TransportFilter {
private static final Log LOG = LogFactory.getLog(InactivityMonitor.class);
- private static final ThreadPoolExecutor ASYNC_TASKS;
-
+ private static ThreadPoolExecutor ASYNC_TASKS;
private static int CHECKER_COUNTER;
private static long DEFAULT_CHECK_TIME_MILLS = 30000;
private static Timer READ_CHECK_TIMER;
@@ -311,6 +310,7 @@ public class InactivityMonitor extends T
writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime;
synchronized( InactivityMonitor.class ) {
if( CHECKER_COUNTER == 0 ) {
+ ASYNC_TASKS = createExecutor();
READ_CHECK_TIMER = new Timer("InactivityMonitor
ReadCheck",true);
WRITE_CHECK_TIMER = new Timer("InactivityMonitor
WriteCheck",true);
}
@@ -354,20 +354,22 @@ public class InactivityMonitor extends T
READ_CHECK_TIMER.cancel();
WRITE_CHECK_TIMER = null;
READ_CHECK_TIMER = null;
+ ASYNC_TASKS.shutdownNow();
+ ASYNC_TASKS = null;
}
}
}
}
+ private ThreadFactory factory = new ThreadFactory() {
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable, "InactivityMonitor Async
Task: "+runnable);
+ thread.setDaemon(true);
+ return thread;
+ }
+ };
- static {
- ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
- public Thread newThread(Runnable runnable) {
- Thread thread = new Thread(runnable, "InactivityMonitor Async
Task: "+runnable);
- thread.setDaemon(true);
- return thread;
- }
- });
+ private ThreadPoolExecutor createExecutor() {
+ return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
}
-
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=1049184&r1=1049183&r2=1049184&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
Tue Dec 14 17:28:10 2010
@@ -47,6 +47,9 @@ import org.apache.activemq.wireformat.Wi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
+import static
org.apache.activemq.thread.DefaultThreadPools.getDefaultTaskRunnerFactory;
+
/**
* An implementation of the {...@link Transport} interface using raw tcp/ip
*
@@ -55,7 +58,6 @@ import org.apache.commons.logging.LogFac
*/
public class TcpTransport extends TransportThreadSupport implements Transport,
Service, Runnable {
private static final Log LOG = LogFactory.getLog(TcpTransport.class);
- private static final ThreadPoolExecutor SOCKET_CLOSE;
protected final URI remoteLocation;
protected final URI localLocation;
protected final WireFormat wireFormat;
@@ -516,7 +518,7 @@ public class TcpTransport extends Transp
//closing the socket can hang also
final CountDownLatch latch = new CountDownLatch(1);
- SOCKET_CLOSE.execute(new Runnable() {
+ getDefaultTaskRunnerFactory().execute(new Runnable() {
public void run() {
try {
@@ -612,19 +614,6 @@ public class TcpTransport extends Transp
return super.narrow(target);
}
-
- static {
- SOCKET_CLOSE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
- public Thread newThread(Runnable runnable) {
- Thread thread = new Thread(runnable, "TcpSocketClose:
"+runnable);
- thread.setPriority(Thread.MAX_PRIORITY);
- thread.setDaemon(true);
- return thread;
- }
- });
- }
-
-
public int getReceiveCounter() {
return receiveCounter;
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java?rev=1049184&r1=1049183&r2=1049184&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java
Tue Dec 14 17:28:10 2010
@@ -28,6 +28,8 @@ import org.apache.activemq.broker.Broker
import org.apache.activemq.bugs.embedded.ThreadExplorer;
import org.apache.activemq.network.NetworkConnector;
+import static org.apache.activemq.thread.DefaultThreadPools.shutdown;
+
public class VmTransportNetworkBrokerTest extends TestCase {
private static final String VM_BROKER_URI =
@@ -60,10 +62,9 @@ public class VmTransportNetworkBrokerTes
assertTrue("Threads are leaking: " + ThreadExplorer.show("active
sleep") + ", threadCount=" + threadCount + " threadCountAfterSleep=" +
threadCountAfterSleep,
threadCountAfterSleep < threadCount + 8);
- connection.stop();
+ connection.close();
broker.stop();
broker.waitUntilStopped();
-
}
public void testNoDanglingThreadsAfterStop() throws Exception {
@@ -73,13 +74,22 @@ public class VmTransportNetworkBrokerTes
broker.setSchedulerSupport(true);
broker.setDedicatedTaskRunner(true);
broker.setPersistent(false);
- broker.addConnector("tcp://localhost:61616");
+
broker.addConnector("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
broker.start();
+
+ ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
+ Connection connection = cf.createConnection("system", "manager");
+ connection.start();
+ connection.close();
broker.stop();
broker.waitUntilStopped();
+ shutdown();
+
+ // let it settle
+ TimeUnit.SECONDS.sleep(5);
int threadCountAfterStop = Thread.activeCount();
- assertTrue("Threads are leaking: " + ThreadExplorer.show("active afer
stop") + ". threadCount=" + threadCount + " threadCountAfterStop=" +
threadCountAfterStop,
+ assertTrue("Threads are leaking: " + ThreadExplorer.show("active after
stop") + ". threadCount=" + threadCount + " threadCountAfterStop=" +
threadCountAfterStop,
threadCountAfterStop == threadCount);
}