Author: rajdavies
Date: Mon Mar 17 06:30:38 2008
New Revision: 637879

URL: http://svn.apache.org/viewvc?rev=637879&view=rev
Log:
send shutdown to transports asynchronously - as they may be blocked

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=637879&r1=637878&r2=637879&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 Mon Mar 17 06:30:38 2008
@@ -21,6 +21,10 @@
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -75,7 +79,7 @@
 public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
     
     private static final Log LOG = 
LogFactory.getLog(DemandForwardingBridge.class);
-    
+    private static final ThreadPoolExecutor STOP_TASKS;
     protected final Transport localBroker;
     protected final Transport remoteBroker;
     protected final IdGenerator idGenerator = new IdGenerator();
@@ -113,6 +117,7 @@
     private boolean createdByDuplex;
     private BrokerInfo localBrokerInfo;
     private BrokerInfo remoteBrokerInfo;
+    
 
     private AtomicBoolean started = new AtomicBoolean();
 
@@ -331,10 +336,23 @@
                 try {
                     disposed = true;
                     remoteBridgeStarted.set(false);
-                    localBroker.oneway(new ShutdownInfo());
-                    remoteBroker.oneway(new ShutdownInfo());
-                } catch (IOException e) {
-                    LOG.debug("Caught exception stopping", e);
+                    final CountDownLatch sendShutdown = new CountDownLatch(1);
+                    STOP_TASKS.execute(new Runnable() {
+                        public void run() {
+                            try {
+                                localBroker.oneway(new ShutdownInfo());
+                                remoteBroker.oneway(new ShutdownInfo());
+                            } catch (Throwable e) {
+                                LOG.debug("Caught exception sending shutdown", 
e);
+                            }finally {
+                                sendShutdown.countDown();
+                            }
+                            
+                        }
+                    });
+                    if( !sendShutdown.await(100, TimeUnit.MILLISECONDS) ) {
+                        LOG.debug("Network Could not shutdown in a timely 
manner");
+                    }
                 } finally {
                     ServiceStopper ss = new ServiceStopper();
                     ss.stop(localBroker);
@@ -636,7 +654,7 @@
                     }
                 }
             } catch (Throwable e) {
-               e.printStackTrace();
+                LOG.warn("Caught an exception processing local command",e);
                 serviceLocalException(e);
             }
         }
@@ -949,6 +967,16 @@
     
     protected boolean isDuplex() {
         return configuration.isDuplex() || createdByDuplex;
+    }
+    
+    static {
+        STOP_TASKS =   new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, 
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
+            public Thread newThread(Runnable runnable) {
+                Thread thread = new Thread(runnable, "NetworkBridge: 
"+runnable);
+                thread.setDaemon(true);
+                return thread;
+            }
+        });
     }
 
 }


Reply via email to