Repository: qpid-jms Updated Branches: refs/heads/master dd97edb72 -> f8470ec1b
QPIDJMS-157 Add timeout handling to close of resources like consumer, producer, session, etc. Fix possible double close call when producer is closed. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/f8470ec1 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/f8470ec1 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/f8470ec1 Branch: refs/heads/master Commit: f8470ec1bc9f7280233593c5809c443a55f3ac92 Parents: dd97edb Author: Timothy Bish <tabish...@gmail.com> Authored: Wed Mar 23 18:21:09 2016 -0400 Committer: Timothy Bish <tabish...@gmail.com> Committed: Wed Mar 23 18:21:09 2016 -0400 ---------------------------------------------------------------------- .../jms/provider/amqp/AmqpAbstractResource.java | 34 ++++++++++++++++++-- .../provider/amqp/AmqpConnectionSession.java | 6 ++-- .../jms/provider/amqp/AmqpFixedProducer.java | 4 +-- .../integration/ConsumerIntegrationTest.java | 30 +++++++++++++++++ .../integration/ProducerIntegrationTest.java | 29 +++++++++++++++++ .../jms/integration/SessionIntegrationTest.java | 26 +++++++++++++++ 6 files changed, 122 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f8470ec1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java index a075900..81f8657 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java @@ -17,7 +17,9 @@ package org.apache.qpid.jms.provider.amqp; import java.io.IOException; +import java.util.concurrent.ScheduledFuture; +import org.apache.qpid.jms.JmsOperationTimedOutException; import org.apache.qpid.jms.meta.JmsConnectionInfo; import org.apache.qpid.jms.meta.JmsResource; import org.apache.qpid.jms.provider.AsyncResult; @@ -41,6 +43,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp private static final Logger LOG = LoggerFactory.getLogger(AmqpAbstractResource.class); protected AsyncResult closeRequest; + protected ScheduledFuture<?> closeTimeoutTask; private final E endpoint; private final R resourceInfo; @@ -97,6 +100,35 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp closeRequest = request; + long closeTimeout = getParent().getProvider().getRequestTimeout(); + if (closeTimeout != JmsConnectionInfo.INFINITE) { + closeTimeoutTask = getParent().getProvider().scheduleRequestTimeout( + new AsyncResult() { + + @Override + public void onSuccess() { + // Not called in this context. + } + + @Override + public void onFailure(Throwable result) { + closeRequest.onFailure(result); + closeRequest = null; + + // This ensures that the resource gets properly cleaned + // up, the request will have already completed so there + // won't be multiple events fired. + resourceClosed(); + } + + @Override + public boolean isComplete() { + return closeRequest != null ? closeRequest.isComplete() : true; + } + + }, closeTimeout, new JmsOperationTimedOutException("Timed Out Waiting for close response: " + this)); + } + closeOrDetachEndpoint(); } @@ -124,8 +156,6 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp endpoint.close(); } - LOG.info("Resource {} was remotely closed", getResourceInfo()); - if (getResourceInfo() instanceof JmsConnectionInfo) { provider.fireProviderException(error); } else { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f8470ec1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java index 162feb4..04fc489 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java @@ -77,8 +77,8 @@ public class AmqpConnectionSession extends AmqpSession { private class DurableSubscriptionReattach extends AmqpAbstractResource<JmsSessionInfo, Receiver> { - public DurableSubscriptionReattach(JmsSessionInfo resource, Receiver receiver) { - super(resource, receiver); + public DurableSubscriptionReattach(JmsSessionInfo resource, Receiver receiver, AmqpResourceParent parent) { + super(resource, receiver, parent); } public String getSubscriptionName() { @@ -108,7 +108,7 @@ public class AmqpConnectionSession extends AmqpSession { @Override protected DurableSubscriptionReattach createResource(AmqpSession parent, JmsSessionInfo resourceInfo, Receiver endpoint) { - return new DurableSubscriptionReattach(resourceInfo, endpoint); + return new DurableSubscriptionReattach(resourceInfo, endpoint, getProvider()); } @Override http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f8470ec1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java index b582052..f548096 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java @@ -194,7 +194,7 @@ public class AmqpFixedProducer extends AmqpProducer { LOG.trace("Dispatching previously held send"); InFlightSend held = blocked.pop(); try { - doSend(held.envelope, held); // TODO - Cancel timeout and reset after dispatch ? + doSend(held.envelope, held); } catch (JMSException e) { throw IOExceptionSupport.create(e); } @@ -202,7 +202,7 @@ public class AmqpFixedProducer extends AmqpProducer { } // Once the pending sends queue is drained we can propagate the close request. - if (blocked.isEmpty() && isAwaitingClose()) { + if (blocked.isEmpty() && isAwaitingClose() && !isClosed()) { super.close(closeRequest); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f8470ec1/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java index f7715a9..762f345 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java @@ -43,6 +43,7 @@ import javax.jms.Topic; import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.JmsDefaultConnectionListener; +import org.apache.qpid.jms.JmsOperationTimedOutException; import org.apache.qpid.jms.JmsPrefetchPolicy; import org.apache.qpid.jms.message.JmsInboundMessageDispatch; import org.apache.qpid.jms.test.QpidJmsTestCase; @@ -89,6 +90,35 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase { } @Test(timeout = 20000) + public void testCloseConsumerTimesOut() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + connection.setRequestTimeout(500); + + testPeer.expectBegin(); + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlow(); + testPeer.expectDetach(true, false, true); + testPeer.expectClose(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + MessageConsumer consumer = session.createConsumer(queue); + + try { + consumer.close(); + fail("Should have thrown a timed out exception"); + } catch (JmsOperationTimedOutException jmsEx) { + LOG.info("Caught excpected exception", jmsEx); + } + + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) public void testRemotelyCloseConsumer() throws Exception { final String BREAD_CRUMB = "ErrorMessage"; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f8470ec1/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java index de1e6b3..f65a47a 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java @@ -49,6 +49,7 @@ import javax.jms.TextMessage; import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.jms.JmsOperationTimedOutException; import org.apache.qpid.jms.JmsSendTimedOutException; import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport; import org.apache.qpid.jms.test.QpidJmsTestCase; @@ -99,6 +100,34 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { } @Test(timeout = 20000) + public void testCloseSenderTimesOut() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + connection.setRequestTimeout(500); + + testPeer.expectBegin(); + testPeer.expectSenderAttach(); + testPeer.expectDetach(true, false, true); + testPeer.expectClose(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + MessageProducer producer = session.createProducer(queue); + + try { + producer.close(); + fail("Should have thrown a timed out exception"); + } catch (JmsOperationTimedOutException jmsEx) { + LOG.info("Caught excpected exception", jmsEx); + } + + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) public void testSentTextMessageCanBeModified() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f8470ec1/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java index 7275cac..e16fa55 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java @@ -112,6 +112,32 @@ public class SessionIntegrationTest extends QpidJmsTestCase { } @Test(timeout = 20000) + public void testCloseSessionTimesOut() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + connection.setRequestTimeout(500); + + testPeer.expectBegin(); + testPeer.expectEnd(false); + testPeer.expectClose(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull("Session should not be null", session); + + try { + session.close(); + fail("Should have thrown an timed out exception"); + } catch (JmsOperationTimedOutException jmsEx) { + LOG.info("Caught exception: {}", jmsEx.getMessage()); + } + + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) public void testCreateProducer() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org