Author: chirino
Date: Wed Nov 14 19:19:21 2012
New Revision: 1409308

URL: http://svn.apache.org/viewvc?rev=1409308&view=rev
Log:
Fixing amqp impl bugs.

Modified:
    
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
    
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java

Modified: 
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: 
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1409308&r1=1409307&r2=1409308&view=diff
==============================================================================
--- 
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
 (original)
+++ 
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
 Wed Nov 14 19:19:21 2012
@@ -549,10 +549,11 @@ class AmqpProtocolHandler extends Protoc
     }
 
     override def processTransportFailure(error: IOException) {
+      on_transport_disconnected()
       if( !gracefully_closed ) {
         connection_log.info("Shutting connection '%s'  down due to: %s", 
security_context.remote_address, error)
+        connection.stop(NOOP)
       }
-      on_transport_disconnected()
     }
 
     def processConnectionClose(conn: engine.Connection, onComplete: Task) {

Modified: 
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java
URL: 
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java?rev=1409308&r1=1409307&r2=1409308&view=diff
==============================================================================
--- 
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java
 (original)
+++ 
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java
 Wed Nov 14 19:19:21 2012
@@ -477,16 +477,18 @@ public class AmqpTransport extends Watch
 
         @Override
         public void onTransportFailure(IOException error) {
-            if( state!=CONNECTED ) {
+            if( state==CONNECTED ) {
                 failure = error;
-                listener.processTransportFailure(error);
-                fireWatches();
+                if( listener!=null ) {
+                    listener.processTransportFailure(error);
+                    fireWatches();
+                }
             }
         }
 
         void onFailure(Throwable error) {
+            failure = error;
             if( listener!=null ) {
-                failure = error;
                 listener.processFailure(error);
                 fireWatches();
             }


Reply via email to