Repository: qpid-jms Updated Branches: refs/heads/master 974037eac -> 926fc571d
https://issues.apache.org/jira/browse/QPIDJMS-175 Add amqp.drainTimeout to handle the case where the remote doesn't send any response for drain request. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/926fc571 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/926fc571 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/926fc571 Branch: refs/heads/master Commit: 926fc571da5023137bb42035d84139b8da3a981d Parents: 974037e Author: Timothy Bish <[email protected]> Authored: Fri May 6 17:54:00 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Fri May 6 17:54:00 2016 -0400 ---------------------------------------------------------------------- .../java/org/apache/qpid/jms/JmsConnection.java | 14 ++--- .../apache/qpid/jms/JmsConnectionListener.java | 24 ++++---- .../org/apache/qpid/jms/JmsMessageConsumer.java | 2 +- .../java/org/apache/qpid/jms/JmsSession.java | 16 ++--- .../jms/provider/DefaultProviderListener.java | 2 +- .../qpid/jms/provider/ProviderListener.java | 10 ++-- .../qpid/jms/provider/ProviderWrapper.java | 4 +- .../jms/provider/amqp/AmqpAbstractResource.java | 6 +- .../qpid/jms/provider/amqp/AmqpConsumer.java | 43 +++++++++---- .../qpid/jms/provider/amqp/AmqpProvider.java | 29 +++++++-- .../qpid/jms/JmsDefaultConnectionListener.java | 6 +- .../integration/ConsumerIntegrationTest.java | 63 +++++++++++++++++++- .../integration/ProducerIntegrationTest.java | 2 +- .../jms/integration/SessionIntegrationTest.java | 4 +- .../JmsConsumerPriorityDispatchTest.java | 12 ++-- .../jms/discovery/FileWatcherDiscoveryTest.java | 6 +- .../jms/discovery/JmsAmqpDiscoveryTest.java | 6 +- .../transactions/JmsTransactedConsumerTest.java | 6 +- 18 files changed, 178 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java index 4374954..ab9fb7a 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java @@ -1194,7 +1194,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection } @Override - public void onResourceRemotelyClosed(final JmsResource resource, final Exception cause) { + public void onResourceClosed(final JmsResource resource, final Exception cause) { // Closure of the Connection itself is notified via onConnectionFailure // Run on the connection executor to free the provider to go do more work and avoid @@ -1207,19 +1207,19 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection if (resource instanceof JmsSessionInfo) { JmsSession session = sessions.get(resource.getId()); if (session != null) { - session.remotelyClosed(cause); + session.sessionClosed(cause); for (JmsConnectionListener listener : connectionListeners) { - listener.onSessionRemotelyClosed(session, cause); + listener.onSessionClosed(session, cause); } } } else if (resource instanceof JmsProducerInfo) { JmsSessionId parentId = ((JmsProducerInfo) resource).getParentId(); JmsSession session = sessions.get(parentId); if (session != null) { - JmsMessageProducer producer = session.producerRemotelyClosed((JmsProducerInfo) resource, cause); + JmsMessageProducer producer = session.producerClosed((JmsProducerInfo) resource, cause); if (producer != null) { for (JmsConnectionListener listener : connectionListeners) { - listener.onProducerRemotelyClosed(producer, cause); + listener.onProducerClosed(producer, cause); } } } @@ -1227,10 +1227,10 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection JmsSessionId parentId = ((JmsConsumerInfo) resource).getParentId(); JmsSession session = sessions.get(parentId); if (session != null) { - JmsMessageConsumer consumer = session.consumerRemotelyClosed((JmsConsumerInfo) resource, cause); + JmsMessageConsumer consumer = session.consumerClosed((JmsConsumerInfo) resource, cause); if (consumer != null) { for (JmsConnectionListener listener : connectionListeners) { - listener.onConsumerRemotelyClosed(consumer, cause); + listener.onConsumerClosed(consumer, cause); } } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java index f06f343..7aecc44 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java @@ -75,33 +75,33 @@ public interface JmsConnectionListener { void onInboundMessage(JmsInboundMessageDispatch envelope); /** - * Called when the remote peer closes a session. + * Called when the session is closed due to remote action or local error detection. * * @param session - * The session that was closed on the remote end. + * The session that was closed and needs to be cleaned up. * @param cause - * The exception that provides additional context on the remote closure. + * The exception that provides additional context on the closure. */ - void onSessionRemotelyClosed(Session session, Exception cause); + void onSessionClosed(Session session, Exception cause); /** - * Called when the remote peer closes a MessageConsumer. + * Called when the MessageConsumer is closed due to remote action or local error detection. * * @param consumer - * The consumer that was closed on the remote end. + * The consumer that was closed and needs to be cleaned up. * @param cause - * The exception that provides additional context on the remote closure. + * The exception that provides additional context on the closure. */ - void onConsumerRemotelyClosed(MessageConsumer consumer, Exception cause); + void onConsumerClosed(MessageConsumer consumer, Exception cause); /** - * Called when the remote peer closes a MessageProducer. + * Called when the MessageProducer is closed due to remote action or local error detection. * * @param producer - * The producer that was closed on the remote end. + * The producer that was closed and needs to be cleaned up. * @param cause - * The exception that provides additional context on the remote closure. + * The exception that provides additional context on the closure. */ - void onProducerRemotelyClosed(MessageProducer producer, Exception cause); + void onProducerClosed(MessageProducer producer, Exception cause); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java index 46f19ca..d63af7c 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java @@ -257,7 +257,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe // is redundant: zero-prefetch consumers already pull, and // the rest block indefinitely on the local messageQueue. pullForced = true; - if(performPullIfRequired(timeout, true)) { + if (performPullIfRequired(timeout, true)) { startConsumerResource(); // We refresh credit if it is a prefetching consumer, since the // pull drained it. Processing acks can open the credit window, but http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java index 64c514e..3569fc7 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java @@ -268,16 +268,16 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe } } - void remotelyClosed(Exception cause) { + void sessionClosed(Exception cause) { try { shutdown(cause); } catch (Throwable error) { - LOG.trace("Ignoring exception thrown during cleanup of remotely closed session", error); + LOG.trace("Ignoring exception thrown during cleanup of closed session", error); } } - JmsMessageConsumer consumerRemotelyClosed(JmsConsumerInfo resource, Exception cause) { - LOG.info("A JMS MessageConsumer has been remotely closed: {}", resource); + JmsMessageConsumer consumerClosed(JmsConsumerInfo resource, Exception cause) { + LOG.info("A JMS MessageConsumer has been closed: {}", resource); JmsMessageConsumer consumer = consumers.get(resource.getId()); @@ -286,14 +286,14 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe consumer.shutdown(cause); } } catch (Throwable error) { - LOG.trace("Ignoring exception thrown during cleanup of remotely closed consumer", error); + LOG.trace("Ignoring exception thrown during cleanup of closed consumer", error); } return consumer; } - JmsMessageProducer producerRemotelyClosed(JmsProducerInfo resource, Exception cause) { - LOG.info("A JMS MessageProducer has been remotely closed: {}", resource); + JmsMessageProducer producerClosed(JmsProducerInfo resource, Exception cause) { + LOG.info("A JMS MessageProducer has been closed: {}", resource); JmsMessageProducer producer = producers.get(resource.getId()); @@ -302,7 +302,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe producer.shutdown(cause); } } catch (Throwable error) { - LOG.trace("Ignoring exception thrown during cleanup of remotely closed producer", error); + LOG.trace("Ignoring exception thrown during cleanup of closed producer", error); } return producer; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java index b8ad9d2..22e204e 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java @@ -56,7 +56,7 @@ public class DefaultProviderListener implements ProviderListener { } @Override - public void onResourceRemotelyClosed(JmsResource resource, Exception cause) { + public void onResourceClosed(JmsResource resource, Exception cause) { } @Override http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java index ebb653d..5c758ed 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java @@ -112,15 +112,17 @@ public interface ProviderListener { void onConnectionFailure(IOException ex); /** - * Called to indicate that a currently active resource has been closed on the - * remote end due to management or other action. + * Called to indicate that a currently active resource has been closed + * due to some error condition, management request or some other action. + * This can either be initiated remotely or locally depending on the + * condition that triggers the close. * * @param resource - * the JmsResource instance that has been remotely closed. + * the JmsResource instance that has been closed. * @param cause * optional exception object that indicates the cause of the close. */ - void onResourceRemotelyClosed(JmsResource resource, Exception cause); + void onResourceClosed(JmsResource resource, Exception cause); /** * Called to indicate that a some client operation caused or received an http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java index e5c5799..8574e04 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java @@ -184,8 +184,8 @@ public class ProviderWrapper<E extends Provider> implements Provider, ProviderLi } @Override - public void onResourceRemotelyClosed(JmsResource resource, Exception cause) { - listener.onResourceRemotelyClosed(resource, cause); + public void onResourceClosed(JmsResource resource, Exception cause) { + listener.onResourceClosed(resource, cause); } @Override http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java index 81f8657..7476d9d 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java @@ -144,8 +144,10 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp } public void remotelyClosed(AmqpProvider provider) { - Exception error = AmqpSupport.convertToException(getEndpoint(), getEndpoint().getRemoteCondition()); + locallyClosed(provider, AmqpSupport.convertToException(getEndpoint(), getEndpoint().getRemoteCondition())); + } + public void locallyClosed(AmqpProvider provider, Exception error) { if (parent != null) { parent.removeChildResource(this); } @@ -159,7 +161,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp if (getResourceInfo() instanceof JmsConnectionInfo) { provider.fireProviderException(error); } else { - provider.fireResourceRemotelyClosed(getResourceInfo(), error); + provider.fireResourceClosed(getResourceInfo(), error); } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index 8f5e651..a8ef32d 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -112,6 +112,23 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver // the peer sees the update. stopRequest = request; receiver.drain(0); + + if (getDrainTimeout() > 0) { + // If the remote doesn't respond we will close the consumer and break any + // blocked receive or stop calls that are waiting. + final ScheduledFuture<?> future = getSession().schedule(new Runnable() { + @Override + public void run() { + LOG.trace("Consumer {} drain request timed out", getConsumerId()); + IOException error = new IOException("Remote did not respond to a drain request in time"); + locallyClosed(session.getProvider(), error); + stopRequest.onFailure(error); + session.getProvider().pumpToProtonTransport(stopRequest); + } + }, getDrainTimeout()); + + stopRequest = new ScheduledRequest(future, stopRequest); + } } } @@ -124,14 +141,12 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver LOG.trace("Consumer {} running scheduled stop", getConsumerId()); if (getEndpoint().getRemoteCredit() != 0) { stop(request); - // TODO: We close the proton transport head to avoid this doing any writes if - // the TCP transport has gone, but it might be good to also avoid trying here. session.getProvider().pumpToProtonTransport(request); } } }, timeout); - stopRequest = new ScheduledStopRequest(future, request); + stopRequest = new ScheduledRequest(future, request); } @Override @@ -488,6 +503,10 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver this.presettle = presettle; } + public int getDrainTimeout() { + return session.getProvider().getDrainTimeout(); + } + @Override public String toString() { return "AmqpConsumer { " + getResourceInfo().getId() + " }"; @@ -544,26 +563,26 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver //----- Inner classes used in message pull operations --------------------// - protected static final class ScheduledStopRequest implements AsyncResult { + protected static final class ScheduledRequest implements AsyncResult { - private final ScheduledFuture<?> sheduledStopTask; + private final ScheduledFuture<?> sheduledTask; private final AsyncResult origRequest; - public ScheduledStopRequest(ScheduledFuture<?> completionTask, AsyncResult origRequest) { - this.sheduledStopTask = completionTask; + public ScheduledRequest(ScheduledFuture<?> completionTask, AsyncResult origRequest) { + this.sheduledTask = completionTask; this.origRequest = origRequest; } @Override - public void onFailure(Throwable t) { - sheduledStopTask.cancel(false); - origRequest.onFailure(t); + public void onFailure(Throwable cause) { + sheduledTask.cancel(false); + origRequest.onFailure(cause); } @Override public void onSuccess() { - boolean cancelled = sheduledStopTask.cancel(false); - if(cancelled) { + boolean cancelled = sheduledTask.cancel(false); + if (cancelled) { // Signal completion. Otherwise wait for the scheduled task to do it. origRequest.onSuccess(); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index 5520ec4..8e25c86 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -16,10 +16,6 @@ */ package org.apache.qpid.jms.provider.amqp; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; -import io.netty.util.ReferenceCountUtil; - import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; @@ -77,6 +73,10 @@ import org.apache.qpid.proton.framing.TransportFrame; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.util.ReferenceCountUtil; + /** * An AMQP v1.0 Provider. * @@ -118,6 +118,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP private long closeTimeout = JmsConnectionInfo.DEFAULT_CLOSE_TIMEOUT; private int channelMax = DEFAULT_CHANNEL_MAX; private int idleTimeout = 60000; + private int drainTimeout = 60000; private long sessionOutoingWindow = -1; //Use proton default private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE; @@ -924,10 +925,10 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP } } - void fireResourceRemotelyClosed(JmsResource resource, Exception ex) { + void fireResourceClosed(JmsResource resource, Exception ex) { ProviderListener listener = this.listener; if (listener != null) { - listener.onResourceRemotelyClosed(resource, ex); + listener.onResourceClosed(resource, ex); } } @@ -1029,6 +1030,22 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP this.idleTimeout = idleTimeout; } + public int getDrainTimeout() { + return drainTimeout; + } + + /** + * Sets the drain timeout (in milliseconds) after which a consumer will be + * treated as having failed and will be closed due to unknown state of the + * remote having not responded to the requested drain. + * + * @param drainTimeout + * the drainTimeout to use for receiver links. + */ + public void setDrainTimeout(int drainTimeout) { + this.drainTimeout = drainTimeout; + } + public int getMaxFrameSize() { return maxFrameSize; } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java index 80de69a..9186eba 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java @@ -49,14 +49,14 @@ public class JmsDefaultConnectionListener implements JmsConnectionListener { } @Override - public void onSessionRemotelyClosed(Session session, Exception exception) { + public void onSessionClosed(Session session, Exception exception) { } @Override - public void onConsumerRemotelyClosed(MessageConsumer consumer, Exception cause) { + public void onConsumerClosed(MessageConsumer consumer, Exception cause) { } @Override - public void onProducerRemotelyClosed(MessageProducer producer, Exception cause) { + public void onProducerClosed(MessageProducer producer, Exception cause) { } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java index 5dc4e37..4ca76b7 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java @@ -128,7 +128,7 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase { JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); connection.addConnectionListener(new JmsDefaultConnectionListener() { @Override - public void onConsumerRemotelyClosed(MessageConsumer consumer, Exception exception) { + public void onConsumerClosed(MessageConsumer consumer, Exception exception) { consumerClosed.set(true); } }); @@ -754,6 +754,67 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase { } } + @Test(timeout=30000) + public void testReceiveWithTimoutAndNoDrainResponseFailsAfterTimeout() throws IOException, Exception { + doDrainWithNoResponseOnNoMessageTestImpl(false); + } + + @Test(timeout=30000) + public void testReceiveNoWaitAndNoDrainResponseFailsAfterTimeout() throws IOException, Exception { + doDrainWithNoResponseOnNoMessageTestImpl(true); + } + + private void doDrainWithNoResponseOnNoMessageTestImpl(boolean noWait) throws JMSException, InterruptedException, Exception, IOException { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = null; + connection = testFixture.establishConnecton(testPeer, "?amqp.drainTimeout=500"); + + connection.start(); + + testPeer.expectBegin(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + + // Expect receiver link attach and send credit + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_PREFETCH))); + + // Expect drain but do not respond so that the consumer times out. + testPeer.expectLinkFlow(true, false, equalTo(UnsignedInteger.valueOf(JmsPrefetchPolicy.DEFAULT_QUEUE_PREFETCH))); + + // Consumer should close due to timed waiting for drain. + testPeer.expectDetach(true, true, true); + + MessageConsumer consumer = session.createConsumer(queue); + + try { + if (noWait) { + consumer.receiveNoWait(); + } else { + consumer.receive(1); + } + + fail("Drain timeout should have aborted the receive."); + } catch (JMSException ex) { + LOG.info("Receive failed after drain timeout as expected: {}", ex.getMessage()); + } + + try { + consumer.getMessageSelector(); + fail("Should be closed and throw an exception"); + } catch (JMSException ex) { + } + + consumer.close(); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(3000); + } + } + /* Check the clients view of the remaining credit stays in sync with the transports * even in the face of the remote peer advancing the delivery count unexpectedly, * ensuring the client doesn't later think there is credit when there is none. http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java index 2e49080..485e5d2 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java @@ -802,7 +802,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); connection.addConnectionListener(new JmsDefaultConnectionListener() { @Override - public void onProducerRemotelyClosed(MessageProducer producer, Exception exception) { + public void onProducerClosed(MessageProducer producer, Exception exception) { producerClosed.set(true); } }); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java index cf4c0a6..53a4c43 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java @@ -1271,7 +1271,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); connection.addConnectionListener(new JmsDefaultConnectionListener() { @Override - public void onSessionRemotelyClosed(Session session, Exception exception) { + public void onSessionClosed(Session session, Exception exception) { sessionClosed.set(true); } }); @@ -1334,7 +1334,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); connection.addConnectionListener(new JmsDefaultConnectionListener() { @Override - public void onSessionRemotelyClosed(Session session, Exception exception) { + public void onSessionClosed(Session session, Exception exception) { sessionClosed.set(true); } }); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java index 5c15770..e076455 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java @@ -77,15 +77,15 @@ public class JmsConsumerPriorityDispatchTest extends AmqpTestSupport { } @Override - public void onSessionRemotelyClosed(Session session, Exception exception) { + public void onSessionClosed(Session session, Exception exception) { } @Override - public void onConsumerRemotelyClosed(MessageConsumer consumer, Exception cause) { + public void onConsumerClosed(MessageConsumer consumer, Exception cause) { } @Override - public void onProducerRemotelyClosed(MessageProducer producer, Exception cause) { + public void onProducerClosed(MessageProducer producer, Exception cause) { } }); @@ -145,15 +145,15 @@ public class JmsConsumerPriorityDispatchTest extends AmqpTestSupport { } @Override - public void onSessionRemotelyClosed(Session session, Exception exception) { + public void onSessionClosed(Session session, Exception exception) { } @Override - public void onConsumerRemotelyClosed(MessageConsumer consumer, Exception cause) { + public void onConsumerClosed(MessageConsumer consumer, Exception cause) { } @Override - public void onProducerRemotelyClosed(MessageProducer producer, Exception cause) { + public void onProducerClosed(MessageProducer producer, Exception cause) { } }); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/FileWatcherDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/FileWatcherDiscoveryTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/FileWatcherDiscoveryTest.java index 19d8a0a..e0939fb 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/FileWatcherDiscoveryTest.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/FileWatcherDiscoveryTest.java @@ -209,15 +209,15 @@ public class FileWatcherDiscoveryTest extends AmqpTestSupport { } @Override - public void onSessionRemotelyClosed(Session session, Exception exception) { + public void onSessionClosed(Session session, Exception exception) { } @Override - public void onConsumerRemotelyClosed(MessageConsumer consumer, Exception cause) { + public void onConsumerClosed(MessageConsumer consumer, Exception cause) { } @Override - public void onProducerRemotelyClosed(MessageProducer producer, Exception cause) { + public void onProducerClosed(MessageProducer producer, Exception cause) { } }); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java index 24b1a09..dee33f2 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java @@ -224,14 +224,14 @@ public class JmsAmqpDiscoveryTest extends AmqpTestSupport implements JmsConnecti } @Override - public void onSessionRemotelyClosed(Session session, Exception exception) { + public void onSessionClosed(Session session, Exception exception) { } @Override - public void onConsumerRemotelyClosed(MessageConsumer consumer, Exception cause) { + public void onConsumerClosed(MessageConsumer consumer, Exception cause) { } @Override - public void onProducerRemotelyClosed(MessageProducer producer, Exception cause) { + public void onProducerClosed(MessageProducer producer, Exception cause) { } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/926fc571/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java index d536b1e..47c7bfc 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java @@ -360,15 +360,15 @@ public class JmsTransactedConsumerTest extends AmqpTestSupport { } @Override - public void onSessionRemotelyClosed(Session session, Exception exception) { + public void onSessionClosed(Session session, Exception exception) { } @Override - public void onConsumerRemotelyClosed(MessageConsumer consumer, Exception cause) { + public void onConsumerClosed(MessageConsumer consumer, Exception cause) { } @Override - public void onProducerRemotelyClosed(MessageProducer producer, Exception cause) { + public void onProducerClosed(MessageProducer producer, Exception cause) { } }); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
