Author: chirino
Date: Wed Nov 14 20:19:43 2012
New Revision: 1409375
URL: http://svn.apache.org/viewvc?rev=1409375&view=rev
Log:
Fixing up amqp transport client disconnects.
Modified:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnection.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java
Modified:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnection.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnection.java?rev=1409375&r1=1409374&r2=1409375&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnection.java
(original)
+++
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnection.java
Wed Nov 14 20:19:43 2012
@@ -21,6 +21,7 @@ import org.apache.activemq.apollo.amqp.h
import org.apache.activemq.apollo.amqp.hawtdispatch.impl.AmqpTransport;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointError;
import org.apache.qpid.proton.engine.impl.ConnectionImpl;
import org.apache.qpid.proton.engine.impl.ProtocolTracer;
import org.apache.qpid.proton.engine.impl.SessionImpl;
@@ -176,4 +177,23 @@ public class AmqpConnection extends Amqp
public ProtocolTracer getProtocolTracer() {
return transport.getProtocolTracer();
}
+
+ /**
+ * Once the remote end, closes the transport is disconnected.
+ */
+ @Override
+ public void close() {
+ super.close();
+ onRemoteClose(new Callback<EndpointError>() {
+ @Override
+ public void onSuccess(EndpointError value) {
+ disconnect();
+ }
+
+ @Override
+ public void onFailure(Throwable value) {
+ disconnect();
+ }
+ });
+ }
}
Modified:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java?rev=1409375&r1=1409374&r2=1409375&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java
(original)
+++
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java
Wed Nov 14 20:19:43 2012
@@ -505,6 +505,7 @@ public class AmqpTransport extends Watch
state = DISCONNECTED;
hawtdispatchTransport = null;
protonTransport = null;
+ fireWatches();
}
});
}
@@ -540,7 +541,7 @@ public class AmqpTransport extends Watch
addWatch(new Watch() {
@Override
public boolean execute() {
- if( state!=DISCONNECTED ) {
+ if( state==DISCONNECTED ) {
cb.onSuccess(null);
return true;
}