Author: chirino
Date: Wed Nov 14 19:19:21 2012
New Revision: 1409308
URL: http://svn.apache.org/viewvc?rev=1409308&view=rev
Log:
Fixing amqp impl bugs.
Modified:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java
Modified:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1409308&r1=1409307&r2=1409308&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
Wed Nov 14 19:19:21 2012
@@ -549,10 +549,11 @@ class AmqpProtocolHandler extends Protoc
}
override def processTransportFailure(error: IOException) {
+ on_transport_disconnected()
if( !gracefully_closed ) {
connection_log.info("Shutting connection '%s' down due to: %s",
security_context.remote_address, error)
+ connection.stop(NOOP)
}
- on_transport_disconnected()
}
def processConnectionClose(conn: engine.Connection, onComplete: Task) {
Modified:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java?rev=1409308&r1=1409307&r2=1409308&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java
(original)
+++
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java
Wed Nov 14 19:19:21 2012
@@ -477,16 +477,18 @@ public class AmqpTransport extends Watch
@Override
public void onTransportFailure(IOException error) {
- if( state!=CONNECTED ) {
+ if( state==CONNECTED ) {
failure = error;
- listener.processTransportFailure(error);
- fireWatches();
+ if( listener!=null ) {
+ listener.processTransportFailure(error);
+ fireWatches();
+ }
}
}
void onFailure(Throwable error) {
+ failure = error;
if( listener!=null ) {
- failure = error;
listener.processFailure(error);
fireWatches();
}