Author: rajdavies
Date: Mon Mar 17 06:30:38 2008
New Revision: 637879
URL: http://svn.apache.org/viewvc?rev=637879&view=rev
Log:
send shutdown to transports asynchronously - as they may be blocked
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=637879&r1=637878&r2=637879&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Mon Mar 17 06:30:38 2008
@@ -21,6 +21,10 @@
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -75,7 +79,7 @@
public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
private static final Log LOG =
LogFactory.getLog(DemandForwardingBridge.class);
-
+ private static final ThreadPoolExecutor STOP_TASKS;
protected final Transport localBroker;
protected final Transport remoteBroker;
protected final IdGenerator idGenerator = new IdGenerator();
@@ -113,6 +117,7 @@
private boolean createdByDuplex;
private BrokerInfo localBrokerInfo;
private BrokerInfo remoteBrokerInfo;
+
private AtomicBoolean started = new AtomicBoolean();
@@ -331,10 +336,23 @@
try {
disposed = true;
remoteBridgeStarted.set(false);
- localBroker.oneway(new ShutdownInfo());
- remoteBroker.oneway(new ShutdownInfo());
- } catch (IOException e) {
- LOG.debug("Caught exception stopping", e);
+ final CountDownLatch sendShutdown = new CountDownLatch(1);
+ STOP_TASKS.execute(new Runnable() {
+ public void run() {
+ try {
+ localBroker.oneway(new ShutdownInfo());
+ remoteBroker.oneway(new ShutdownInfo());
+ } catch (Throwable e) {
+ LOG.debug("Caught exception sending shutdown",
e);
+ }finally {
+ sendShutdown.countDown();
+ }
+
+ }
+ });
+ if( !sendShutdown.await(100, TimeUnit.MILLISECONDS) ) {
+ LOG.debug("Network Could not shutdown in a timely
manner");
+ }
} finally {
ServiceStopper ss = new ServiceStopper();
ss.stop(localBroker);
@@ -636,7 +654,7 @@
}
}
} catch (Throwable e) {
- e.printStackTrace();
+ LOG.warn("Caught an exception processing local command",e);
serviceLocalException(e);
}
}
@@ -949,6 +967,16 @@
protected boolean isDuplex() {
return configuration.isDuplex() || createdByDuplex;
+ }
+
+ static {
+ STOP_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable, "NetworkBridge:
"+runnable);
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
}
}