Author: chirino
Date: Mon Apr 17 10:04:59 2006
New Revision: 394729

URL: http://svn.apache.org/viewcvs?rev=394729&view=rev
Log:
An async error could cause a deadlock when using the VM transport since all 
it's operations are sync.  The error handling is now done in an async thread to 
avoid the deadlock.


Modified:
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=394729&r1=394728&r2=394729&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
 Mon Apr 17 10:04:59 2006
@@ -88,12 +88,19 @@
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
 import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingDeque;
+import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
 
 public class ActiveMQConnection implements Connection, TopicConnection, 
QueueConnection, StatsCapable, Closeable,  StreamConnection, TransportListener {
 
     public static final TaskRunnerFactory SESSION_TASK_RUNNER = new 
TaskRunnerFactory("session 
Task",ThreadPriorities.INBOUND_CLIENT_SESSION,true,1000);
+    private final Executor asyncConnectionThread;
 
     private static final Log log = LogFactory.getLog(ActiveMQConnection.class);
     private static final IdGenerator connectionIdGenerator = new IdGenerator();
@@ -165,6 +172,14 @@
      */
     protected ActiveMQConnection(Transport transport, JMSStatsImpl 
factoryStats)
             throws Exception {
+       
+       // Configure a single threaded executor who's core thread can timeout 
if idle
+        asyncConnectionThread = new ThreadPoolExecutor(1,1,5,TimeUnit.SECONDS, 
new LinkedBlockingQueue(), new ThreadFactory() {
+            public Thread newThread(Runnable r) {
+                return new Thread(r, "Connection task");
+            }});
+        asyncConnectionThread.allowCoreThreadTimeOut(true);
+        
         this.info = new ConnectionInfo(new 
ConnectionId(connectionIdGenerator.generateId()));
         this.info.setManageable(true);
         this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
@@ -1388,7 +1403,7 @@
     /**
      * @param command - the command to consume
      */
-    public void onCommand(Command command) {
+    public void onCommand(final Command command) {
         if (!closed.get() && command != null) {
             if (command.isMessageDispatch()) {
                 MessageDispatch md = (MessageDispatch) command;
@@ -1416,7 +1431,13 @@
                 onControlCommand((ControlCommand) command);
             }
             else if (command.getDataStructureType() == 
ConnectionError.DATA_STRUCTURE_TYPE) {
-                onAsyncException(((ConnectionError)command).getException());
+                asyncConnectionThread.execute(new Runnable(){
+                    public void run() {
+                        
onAsyncException(((ConnectionError)command).getException());
+                    }
+                });
+                new Thread("Async error worker") {
+                }.start();
             }else if (command instanceof ConnectionControl){
                 onConnectionControl((ConnectionControl) command);
             }else if (command instanceof ConsumerControl){
@@ -1437,25 +1458,37 @@
     public void onAsyncException(Throwable error) {
         if (!closed.get() && !closing.get()) {
             if (this.exceptionListener != null) {
+                
                 if (!(error instanceof JMSException))
                     error = JMSExceptionSupport.create(error);
-                this.exceptionListener.onException((JMSException) error);
+                final JMSException e = (JMSException) error;
+                
+                asyncConnectionThread.execute(new Runnable(){
+                    public void run() {
+                        
ActiveMQConnection.this.exceptionListener.onException(e);
+                    }
+                });
+                
             } else {
                 log.warn("Async exception with no exception listener: " + 
error, error);
             }
         }
     }
     
-    public void onException(IOException error) {
+    public void onException(final IOException error) {
         onAsyncException(error);
-        transportFailed(error);
-        ServiceSupport.dispose(this.transport);
-        brokerInfoReceived.countDown();
-
-        for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
-            TransportListener listener = (TransportListener) iter.next();
-            listener.onException(error);
-        }
+        asyncConnectionThread.execute(new Runnable(){
+            public void run() {
+                transportFailed(error);
+                ServiceSupport.dispose(ActiveMQConnection.this.transport);
+                brokerInfoReceived.countDown();
+        
+                for (Iterator iter = transportListeners.iterator(); 
iter.hasNext();) {
+                    TransportListener listener = (TransportListener) 
iter.next();
+                    listener.onException(error);
+                }
+            }
+        });
     }
     
     public void transportInterupted() {
@@ -1781,4 +1814,4 @@
 
 
     
-}
\ No newline at end of file
+}


Reply via email to