Handle remote close of the Connection by firing the normal exception
events and letting things close down normally.

applies the effective changes from 477df644369ab515daf2d5a600dd245d35c05335 
which were temporarily reverted in d376110e3f9d86418640f1fe10ff266fd00be425


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/c5ff7299
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/c5ff7299
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/c5ff7299

Branch: refs/heads/master
Commit: c5ff7299336b37f32cba869f25557f8684ca0a00
Parents: 297ad6f
Author: Robert Gemmell <rob...@apache.org>
Authored: Tue Feb 24 15:51:34 2015 +0000
Committer: Robert Gemmell <rob...@apache.org>
Committed: Tue Feb 24 15:51:34 2015 +0000

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsConnection.java |  6 ++++-
 .../jms/provider/amqp/AmqpAbstractResource.java | 21 ++++++++++++++--
 .../qpid/jms/provider/amqp/AmqpProvider.java    |  7 ++++++
 .../qpid/jms/provider/amqp/AmqpResource.java    |  3 +++
 .../integration/ConnectionIntegrationTest.java  | 26 ++++++++++++++++++++
 5 files changed, 60 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c5ff7299/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 0b73cb4..32470ee 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -1101,7 +1101,11 @@ public class JmsConnection implements Connection, 
TopicConnection, QueueConnecti
 
     @Override
     public void onResourceRemotelyClosed(JmsResource resource, Exception 
cause) {
-        LOG.info("A JMS resource has been remotely closed: {}", resource);
+        if (resource.equals(this.connectionInfo)) {
+            onException(cause);
+        } else {
+            LOG.info("A JMS resource has been remotely closed: {}", resource);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c5ff7299/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
index a0d308a..1f4c742 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
@@ -22,6 +22,7 @@ import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
 import javax.jms.JMSSecurityException;
 
+import org.apache.qpid.jms.meta.JmsConnectionInfo;
 import org.apache.qpid.jms.meta.JmsResource;
 import org.apache.qpid.jms.provider.AsyncResult;
 import org.apache.qpid.proton.amqp.Symbol;
@@ -163,8 +164,24 @@ public abstract class AmqpAbstractResource<R extends 
JmsResource, E extends Endp
 
     @Override
     public void remotelyClosed(AmqpProvider provider) {
-        // TODO - We need a way to signal that the remote closed unexpectedly.
-        LOG.info("Resource was remotely closed");
+        Exception error = getRemoteError();
+        if (error == null) {
+            error = new IOException("Remote has closed without error 
information");
+        }
+
+        if (endpoint != null) {
+            // TODO: if this is a producer/consumer link then we may only be 
detached,
+            // rather than fully closed, and should respond appropriately.
+            endpoint.close();
+        }
+
+        LOG.info("Resource {} was remotely closed", getJmsResource());
+
+        if (getJmsResource() instanceof JmsConnectionInfo) {
+            provider.fireProviderException(error);
+        } else {
+            provider.fireResourceRemotelyClosed(getJmsResource(), error);
+        }
     }
 
     public E getEndpoint() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c5ff7299/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 3c4cae6..0face7b 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -802,6 +802,13 @@ public class AmqpProvider implements Provider, 
TransportListener {
         }
     }
 
+    void fireResourceRemotelyClosed(JmsResource resource, Exception ex) {
+        ProviderListener listener = this.listener;
+        if (listener != null) {
+            listener.onResourceRemotelyClosed(resource, ex);
+        }
+    }
+
     private void checkClosed() throws ProviderClosedException {
         if (closed.get()) {
             throw new ProviderClosedException("This Provider is already 
closed");

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c5ff7299/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
index 226f9ff..33333ea 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
@@ -87,6 +87,9 @@ public interface AmqpResource {
     /**
      * Called to indicate that the remote end has become closed but the 
resource
      * was not awaiting an open/close completion.
+     *
+     * @param provider
+     *        a reference to the AMQP provider to use to send the remote close 
event.
      */
     void remotelyClosed(AmqpProvider provider);
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c5ff7299/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
index 72960bd..4d24c71 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
@@ -25,9 +25,14 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import javax.jms.Connection;
 import javax.jms.ConnectionMetaData;
+import javax.jms.ExceptionListener;
 import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.Queue;
 import javax.jms.Session;
@@ -91,6 +96,27 @@ public class ConnectionIntegrationTest extends 
QpidJmsTestCase {
         }
     }
 
+    @Test(timeout = 10000)
+    public void testRemotelyEndConnectionListenerInvoked() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final CountDownLatch done = new CountDownLatch(1);
+
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.setExceptionListener(new ExceptionListener() {
+
+                @Override
+                public void onException(JMSException exception) {
+                    done.countDown();
+                }
+            });
+
+            testPeer.remotelyEndConnection(true);
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            assertTrue("Connection should report failure", done.await(5, 
TimeUnit.SECONDS));
+        }
+    }
+
     @Ignore // TODO: resolve related issues and enable
     @Test(timeout = 5000)
     public void testRemotelyEndConnectionWithSessionWithConsumer() throws 
Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to