Handle remote close of the Connection by firing the normal exception events and letting things close down normally.
applies the effective changes from 477df644369ab515daf2d5a600dd245d35c05335 which were temporarily reverted in d376110e3f9d86418640f1fe10ff266fd00be425 Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/c5ff7299 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/c5ff7299 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/c5ff7299 Branch: refs/heads/master Commit: c5ff7299336b37f32cba869f25557f8684ca0a00 Parents: 297ad6f Author: Robert Gemmell <rob...@apache.org> Authored: Tue Feb 24 15:51:34 2015 +0000 Committer: Robert Gemmell <rob...@apache.org> Committed: Tue Feb 24 15:51:34 2015 +0000 ---------------------------------------------------------------------- .../java/org/apache/qpid/jms/JmsConnection.java | 6 ++++- .../jms/provider/amqp/AmqpAbstractResource.java | 21 ++++++++++++++-- .../qpid/jms/provider/amqp/AmqpProvider.java | 7 ++++++ .../qpid/jms/provider/amqp/AmqpResource.java | 3 +++ .../integration/ConnectionIntegrationTest.java | 26 ++++++++++++++++++++ 5 files changed, 60 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c5ff7299/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java index 0b73cb4..32470ee 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java @@ -1101,7 +1101,11 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti @Override public void onResourceRemotelyClosed(JmsResource resource, Exception cause) { - LOG.info("A JMS resource has been remotely closed: {}", resource); + if (resource.equals(this.connectionInfo)) { + onException(cause); + } else { + LOG.info("A JMS resource has been remotely closed: {}", resource); + } } /** http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c5ff7299/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java index a0d308a..1f4c742 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java @@ -22,6 +22,7 @@ import javax.jms.InvalidDestinationException; import javax.jms.JMSException; import javax.jms.JMSSecurityException; +import org.apache.qpid.jms.meta.JmsConnectionInfo; import org.apache.qpid.jms.meta.JmsResource; import org.apache.qpid.jms.provider.AsyncResult; import org.apache.qpid.proton.amqp.Symbol; @@ -163,8 +164,24 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp @Override public void remotelyClosed(AmqpProvider provider) { - // TODO - We need a way to signal that the remote closed unexpectedly. - LOG.info("Resource was remotely closed"); + Exception error = getRemoteError(); + if (error == null) { + error = new IOException("Remote has closed without error information"); + } + + if (endpoint != null) { + // TODO: if this is a producer/consumer link then we may only be detached, + // rather than fully closed, and should respond appropriately. + endpoint.close(); + } + + LOG.info("Resource {} was remotely closed", getJmsResource()); + + if (getJmsResource() instanceof JmsConnectionInfo) { + provider.fireProviderException(error); + } else { + provider.fireResourceRemotelyClosed(getJmsResource(), error); + } } public E getEndpoint() { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c5ff7299/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index 3c4cae6..0face7b 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -802,6 +802,13 @@ public class AmqpProvider implements Provider, TransportListener { } } + void fireResourceRemotelyClosed(JmsResource resource, Exception ex) { + ProviderListener listener = this.listener; + if (listener != null) { + listener.onResourceRemotelyClosed(resource, ex); + } + } + private void checkClosed() throws ProviderClosedException { if (closed.get()) { throw new ProviderClosedException("This Provider is already closed"); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c5ff7299/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java index 226f9ff..33333ea 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java @@ -87,6 +87,9 @@ public interface AmqpResource { /** * Called to indicate that the remote end has become closed but the resource * was not awaiting an open/close completion. + * + * @param provider + * a reference to the AMQP provider to use to send the remote close event. */ void remotelyClosed(AmqpProvider provider); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c5ff7299/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java index 72960bd..4d24c71 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java @@ -25,9 +25,14 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import javax.jms.Connection; import javax.jms.ConnectionMetaData; +import javax.jms.ExceptionListener; import javax.jms.IllegalStateException; +import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; @@ -91,6 +96,27 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase { } } + @Test(timeout = 10000) + public void testRemotelyEndConnectionListenerInvoked() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + final CountDownLatch done = new CountDownLatch(1); + + Connection connection = testFixture.establishConnecton(testPeer); + connection.setExceptionListener(new ExceptionListener() { + + @Override + public void onException(JMSException exception) { + done.countDown(); + } + }); + + testPeer.remotelyEndConnection(true); + testPeer.waitForAllHandlersToComplete(1000); + + assertTrue("Connection should report failure", done.await(5, TimeUnit.SECONDS)); + } + } + @Ignore // TODO: resolve related issues and enable @Test(timeout = 5000) public void testRemotelyEndConnectionWithSessionWithConsumer() throws Exception { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org