Author: gtully
Date: Wed Dec 15 12:44:47 2010
New Revision: 1049527
URL: http://svn.apache.org/viewvc?rev=1049527&view=rev
Log:
additional broker side improvements for
https://issues.apache.org/jira/browse/AMQ-2852 - have discovery and network
connector and vm async tasks delegate to the the default thread pool executor,
serialized the test to ensure shutdown is called once after verification
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1049527&r1=1049526&r2=1049527&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Wed Dec 15 12:44:47 2010
@@ -25,9 +25,6 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -68,6 +65,8 @@ import org.apache.activemq.command.Shutd
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.thread.DefaultThreadPools;
+import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
@@ -92,7 +91,7 @@ import org.apache.commons.logging.LogFac
*/
public abstract class DemandForwardingBridgeSupport implements NetworkBridge,
BrokerServiceAware {
private static final Log LOG =
LogFactory.getLog(DemandForwardingBridgeSupport.class);
- private static final ThreadPoolExecutor ASYNC_TASKS;
+ private final TaskRunnerFactory asyncTaskRunner =
DefaultThreadPools.getDefaultTaskRunnerFactory();
protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
protected final Transport localBroker;
protected final Transport remoteBroker;
@@ -251,7 +250,7 @@ public abstract class DemandForwardingBr
}
protected void triggerLocalStartBridge() throws IOException {
- ASYNC_TASKS.execute(new Runnable() {
+ asyncTaskRunner.execute(new Runnable() {
public void run() {
final String originalName = Thread.currentThread().getName();
Thread.currentThread().setName("StartLocalBridge:
localBroker=" + localBroker);
@@ -267,7 +266,7 @@ public abstract class DemandForwardingBr
}
protected void triggerRemoteStartBridge() throws IOException {
- ASYNC_TASKS.execute(new Runnable() {
+ asyncTaskRunner.execute(new Runnable() {
public void run() {
final String originalName = Thread.currentThread().getName();
Thread.currentThread().setName("StartRemotelBridge:
localBroker=" + localBroker);
@@ -391,7 +390,7 @@ public abstract class DemandForwardingBr
try {
remoteBridgeStarted.set(false);
final CountDownLatch sendShutdown = new CountDownLatch(1);
- ASYNC_TASKS.execute(new Runnable() {
+ asyncTaskRunner.execute(new Runnable() {
public void run() {
try {
localBroker.oneway(new ShutdownInfo());
@@ -433,7 +432,7 @@ public abstract class DemandForwardingBr
LOG.warn("Network connection between " + localBroker + " and "
+ remoteBroker + " shutdown due to a remote error: " + error);
}
LOG.debug("The remote Exception was: " + error, error);
- ASYNC_TASKS.execute(new Runnable() {
+ asyncTaskRunner.execute(new Runnable() {
public void run() {
ServiceSupport.dispose(getControllingService());
}
@@ -647,7 +646,7 @@ public abstract class DemandForwardingBr
if (!disposed.get()) {
LOG.info("Network connection between " + localBroker + " and " +
remoteBroker + " shutdown due to a local error: " + error);
LOG.debug("The local Exception was:" + error, error);
- ASYNC_TASKS.execute(new Runnable() {
+ asyncTaskRunner.execute(new Runnable() {
public void run() {
ServiceSupport.dispose(getControllingService());
}
@@ -674,7 +673,7 @@ public abstract class DemandForwardingBr
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
// continue removal in separate thread to free up this thread for
outstanding responses
- ASYNC_TASKS.execute(new Runnable() {
+ asyncTaskRunner.execute(new Runnable() {
public void run() {
sub.waitForCompletion();
try {
@@ -1277,15 +1276,4 @@ public abstract class DemandForwardingBr
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
-
- static {
- ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
- public Thread newThread(Runnable runnable) {
- Thread thread = new Thread(runnable, "NetworkBridge");
- thread.setDaemon(true);
- return thread;
- }
- });
- }
-
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java?rev=1049527&r1=1049526&r2=1049527&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
Wed Dec 15 12:44:47 2010
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.command.DiscoveryEvent;
+import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.transport.discovery.DiscoveryAgent;
import org.apache.activemq.transport.discovery.DiscoveryListener;
import org.apache.commons.logging.Log;
@@ -38,8 +39,7 @@ import org.apache.commons.logging.LogFac
*/
public class SimpleDiscoveryAgent implements DiscoveryAgent {
- private final static Log LOG =
LogFactory.getLog(SimpleDiscoveryAgent.class);
- private static final ThreadPoolExecutor ASYNC_TASKS;
+ private final static Log LOG =
LogFactory.getLog(SimpleDiscoveryAgent.class);
private long initialReconnectDelay = 1000;
private long maxReconnectDelay = 1000 * 30;
private long backOffMultiplier = 2;
@@ -110,14 +110,14 @@ public class SimpleDiscoveryAgent implem
if (event.failed.compareAndSet(false, true)) {
listener.onServiceRemove(event);
- ASYNC_TASKS.execute(new Runnable() {
+ DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new
Runnable() {
public void run() {
// We detect a failed connection attempt because the
service
// fails right
// away.
if (event.connectTime + minConnectTime >
System.currentTimeMillis()) {
- LOG.debug("Failure occured soon after the discovery
event was generated. It will be clasified as a connection failure: "+event);
+ LOG.debug("Failure occurred soon after the discovery
event was generated. It will be classified as a connection failure: "+event);
event.connectFailures++;
@@ -132,7 +132,7 @@ public class SimpleDiscoveryAgent implem
return;
}
- LOG.debug("Waiting "+event.reconnectDelay+" ms
before attepting to reconnect.");
+ LOG.debug("Waiting "+event.reconnectDelay+" ms
before attempting to reconnect.");
sleepMutex.wait(event.reconnectDelay);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
@@ -163,7 +163,7 @@ public class SimpleDiscoveryAgent implem
event.failed.set(false);
listener.onServiceAdd(event);
}
- });
+ }, "Simple Discovery Agent");
}
}
@@ -214,16 +214,4 @@ public class SimpleDiscoveryAgent implem
public void setUseExponentialBackOff(boolean useExponentialBackOff) {
this.useExponentialBackOff = useExponentialBackOff;
}
-
- static {
- ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
- public Thread newThread(Runnable runnable) {
- Thread thread = new Thread(runnable, "Simple Discovery Agent:
"+runnable);
- thread.setDaemon(true);
- return thread;
- }
- });
- }
-
-
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=1049527&r1=1049526&r2=1049527&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
Wed Dec 15 12:44:47 2010
@@ -23,6 +23,7 @@ import java.util.concurrent.LinkedBlocki
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
@@ -44,8 +45,6 @@ public class VMTransport implements Tran
private static final Object DISCONNECT = new Object();
private static final AtomicLong NEXT_ID = new AtomicLong(0);
- // still possible to configure dedicated task runner through system
property but not programmatically
- private static final TaskRunnerFactory TASK_RUNNER_FACTORY = new
TaskRunnerFactory("VMTransport", Thread.NORM_PRIORITY, true, 1000, false);
protected VMTransport peer;
protected TransportListener transportListener;
protected boolean disposed;
@@ -331,7 +330,7 @@ public class VMTransport implements Tran
if (async) {
synchronized (lazyInitMutext) {
if (taskRunner == null) {
- taskRunner = TASK_RUNNER_FACTORY.createTaskRunner(this,
"VMTransport: " + toString());
+ taskRunner =
DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(this,
"VMTransport: " + toString());
}
}
try {
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java?rev=1049527&r1=1049526&r2=1049527&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java
Wed Dec 15 12:44:47 2010
@@ -27,8 +27,8 @@ import org.apache.activemq.ActiveMQConne
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.bugs.embedded.ThreadExplorer;
import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.thread.DefaultThreadPools;
-import static org.apache.activemq.thread.DefaultThreadPools.shutdown;
public class VmTransportNetworkBrokerTest extends TestCase {
@@ -38,8 +38,10 @@ public class VmTransportNetworkBrokerTes
CountDownLatch started = new CountDownLatch(1);
CountDownLatch gotConnection = new CountDownLatch(1);
- public void testNoThreadLeakWithActiveVMConnection() throws Exception {
-
+ public void testNoThreadLeak() throws Exception {
+
+ // with VMConnection and simple discovery network connector
+ int originalThreadCount = Thread.activeCount();
BrokerService broker = new BrokerService();
broker.setDedicatedTaskRunner(true);
broker.setPersistent(false);
@@ -55,43 +57,42 @@ public class VmTransportNetworkBrokerTes
// let it settle
TimeUnit.SECONDS.sleep(5);
- int threadCount = Thread.activeCount();
+ int threadCountAfterStart = Thread.activeCount();
TimeUnit.SECONDS.sleep(30);
int threadCountAfterSleep = Thread.activeCount();
- assertTrue("Threads are leaking: " + ThreadExplorer.show("active
sleep") + ", threadCount=" + threadCount + " threadCountAfterSleep=" +
threadCountAfterSleep,
- threadCountAfterSleep < threadCount + 8);
+ assertTrue("Threads are leaking: " + ThreadExplorer.show("active
sleep") + ", threadCount=" +threadCountAfterStart + " threadCountAfterSleep=" +
threadCountAfterSleep,
+ threadCountAfterSleep < threadCountAfterStart + 8);
connection.close();
broker.stop();
broker.waitUntilStopped();
- }
- public void testNoDanglingThreadsAfterStop() throws Exception {
+ // testNoDanglingThreadsAfterStop with tcp transport
- int threadCount = Thread.activeCount();
- BrokerService broker = new BrokerService();
+ broker = new BrokerService();
broker.setSchedulerSupport(true);
broker.setDedicatedTaskRunner(true);
broker.setPersistent(false);
broker.addConnector("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
broker.start();
- ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
- Connection connection = cf.createConnection("system", "manager");
+ cf = new
ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
+ connection = cf.createConnection("system", "manager");
connection.start();
connection.close();
broker.stop();
broker.waitUntilStopped();
- shutdown();
+
+ // must only be called when all brokers and connections are done!
+ DefaultThreadPools.shutdown();
// let it settle
TimeUnit.SECONDS.sleep(5);
int threadCountAfterStop = Thread.activeCount();
- assertTrue("Threads are leaking: " + ThreadExplorer.show("active after
stop") + ". threadCount=" + threadCount + " threadCountAfterStop=" +
threadCountAfterStop,
- threadCountAfterStop == threadCount);
+ assertTrue("Threads are leaking: " + ThreadExplorer.show("active after
stop") + ". originalThreadCount=" + originalThreadCount + "
threadCountAfterStop=" + threadCountAfterStop,
+ threadCountAfterStop == originalThreadCount);
}
-
}