Repository: qpid-jms Updated Branches: refs/heads/master 4e963e442 -> e0b5980c0
QPIDJMS-157 Adds support for sendTimeout and requestTimeout in the AMQP provider. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/e0b5980c Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/e0b5980c Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/e0b5980c Branch: refs/heads/master Commit: e0b5980c077555b0d0a3c356d6d0580ec87c6497 Parents: 4e963e4 Author: Timothy Bish <[email protected]> Authored: Tue Mar 22 16:42:59 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Tue Mar 22 16:42:59 2016 -0400 ---------------------------------------------------------------------- .../jms/provider/amqp/AmqpFixedProducer.java | 109 ++++++--- .../qpid/jms/provider/amqp/AmqpProvider.java | 44 +++- .../amqp/AmqpTransactionCoordinator.java | 20 ++ .../amqp/builders/AmqpConnectionBuilder.java | 5 + .../amqp/builders/AmqpResourceBuilder.java | 31 ++- .../org/apache/qpid/jms/JmsConnectionTest.java | 2 +- .../FailedConnectionsIntegrationTest.java | 29 ++- .../integration/ProducerIntegrationTest.java | 152 +++++++++++- .../QueueBrowserIntegrationTest.java | 20 +- .../jms/integration/SessionIntegrationTest.java | 7 +- .../TransactionsIntegrationTest.java | 242 +++++++++++++++++++ .../failover/FailoverIntegrationTest.java | 6 +- .../qpid/jms/test/testpeer/TestAmqpPeer.java | 66 ++++- 13 files changed, 644 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java index 1282138..b582052 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java @@ -22,11 +22,14 @@ import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Set; +import java.util.concurrent.ScheduledFuture; import javax.jms.JMSException; +import org.apache.qpid.jms.JmsSendTimedOutException; import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; import org.apache.qpid.jms.message.facade.JmsMessageFacade; +import org.apache.qpid.jms.meta.JmsConnectionInfo; import org.apache.qpid.jms.meta.JmsProducerInfo; import org.apache.qpid.jms.provider.AsyncResult; import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFacade; @@ -57,8 +60,8 @@ public class AmqpFixedProducer extends AmqpProducer { private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {}; private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator(true); - private final Set<Delivery> pending = new LinkedHashSet<Delivery>(); - private final LinkedList<PendingSend> pendingSends = new LinkedList<PendingSend>(); + private final Set<Delivery> sent = new LinkedHashSet<Delivery>(); + private final LinkedList<InFlightSend> blocked = new LinkedList<InFlightSend>(); private byte[] encodeBuffer = new byte[1024 * 8]; private boolean presettle = false; @@ -73,7 +76,7 @@ public class AmqpFixedProducer extends AmqpProducer { @Override public void close(AsyncResult request) { // If any sends are held we need to wait for them to complete. - if (!pendingSends.isEmpty()) { + if (!blocked.isEmpty()) { this.closeRequest = request; return; } @@ -83,15 +86,20 @@ public class AmqpFixedProducer extends AmqpProducer { @Override public boolean send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException { - // TODO - Handle the case where remote has no credit which means we can't send to it. - // We need to hold the send until remote credit becomes available but we should - // also have a send timeout option and filter timed out sends. if (getEndpoint().getCredit() <= 0) { LOG.trace("Holding Message send until credit is available."); // Once a message goes into a held mode we no longer can send it async, so // we clear the async flag if set to avoid the sender never getting notified. envelope.setSendAsync(false); - this.pendingSends.addLast(new PendingSend(envelope, request)); + + InFlightSend send = new InFlightSend(envelope, request); + + if (getSendTimeout() > JmsConnectionInfo.INFINITE) { + send.requestTimeout = getParent().getProvider().scheduleRequestTimeout( + send, getSendTimeout(), new JmsSendTimedOutException("Timed out waiting for credit to send Message", envelope.getMessage())); + } + + blocked.addLast(send); return false; } else { doSend(envelope, request); @@ -135,12 +143,20 @@ public class AmqpFixedProducer extends AmqpProducer { if (presettle) { delivery.settle(); } else { - pending.add(delivery); + sent.add(delivery); getEndpoint().advance(); } if (envelope.isSendAsync() || presettle) { request.onSuccess(); + } else if (getSendTimeout() != JmsConnectionInfo.INFINITE) { + InFlightSend send = new InFlightSend(envelope, request); + + send.requestTimeout = getParent().getProvider().scheduleRequestTimeout( + send, getSendTimeout(), new JmsSendTimedOutException("Timed out waiting for disposition of sent Message", envelope.getMessage())); + + // Update context so the incoming disposition can cancel any pending timeout + delivery.setContext(send); } } @@ -173,12 +189,12 @@ public class AmqpFixedProducer extends AmqpProducer { @Override public void processFlowUpdates(AmqpProvider provider) throws IOException { - if (!pendingSends.isEmpty() && getEndpoint().getCredit() > 0) { - while (getEndpoint().getCredit() > 0 && !pendingSends.isEmpty()) { + if (!blocked.isEmpty() && getEndpoint().getCredit() > 0) { + while (getEndpoint().getCredit() > 0 && !blocked.isEmpty()) { LOG.trace("Dispatching previously held send"); - PendingSend held = pendingSends.pop(); + InFlightSend held = blocked.pop(); try { - doSend(held.envelope, held.request); + doSend(held.envelope, held); // TODO - Cancel timeout and reset after dispatch ? } catch (JMSException e) { throw IOExceptionSupport.create(e); } @@ -186,7 +202,7 @@ public class AmqpFixedProducer extends AmqpProducer { } // Once the pending sends queue is drained we can propagate the close request. - if (pendingSends.isEmpty() && isAwaitingClose()) { + if (blocked.isEmpty() && isAwaitingClose()) { super.close(closeRequest); } @@ -197,7 +213,7 @@ public class AmqpFixedProducer extends AmqpProducer { public void processDeliveryUpdates(AmqpProvider provider) throws IOException { List<Delivery> toRemove = new ArrayList<Delivery>(); - for (Delivery delivery : pending) { + for (Delivery delivery : sent) { DeliveryState state = delivery.getRemoteState(); if (state == null) { continue; @@ -251,7 +267,7 @@ public class AmqpFixedProducer extends AmqpProducer { delivery.settle(); } - pending.removeAll(toRemove); + sent.removeAll(toRemove); super.processDeliveryUpdates(provider); } @@ -275,22 +291,15 @@ public class AmqpFixedProducer extends AmqpProducer { return presettle; } + public long getSendTimeout() { + return getParent().getProvider().getSendTimeout(); + } + @Override public String toString() { return "AmqpFixedProducer { " + getProducerId() + " }"; } - private static class PendingSend { - - public JmsOutboundMessageDispatch envelope; - public AsyncResult request; - - public PendingSend(JmsOutboundMessageDispatch envelope, AsyncResult request) { - this.envelope = envelope; - this.request = request; - } - } - @Override public void remotelyClosed(AmqpProvider provider) { super.remotelyClosed(provider); @@ -301,7 +310,7 @@ public class AmqpFixedProducer extends AmqpProducer { ex = new JMSException("Producer closed remotely before message transfer result was notified"); } - for (Delivery delivery : pending) { + for (Delivery delivery : sent) { try { AsyncResult request = (AsyncResult) delivery.getContext(); @@ -316,6 +325,50 @@ public class AmqpFixedProducer extends AmqpProducer { } } - pending.clear(); + sent.clear(); + } + + //----- Class used to manage held sends ----------------------------------// + + private class InFlightSend implements AsyncResult { + + public final JmsOutboundMessageDispatch envelope; + public final AsyncResult request; + + public ScheduledFuture<?> requestTimeout; + + public InFlightSend(JmsOutboundMessageDispatch envelope, AsyncResult request) { + this.envelope = envelope; + this.request = request; + } + + @Override + public void onFailure(Throwable cause) { + if (requestTimeout != null) { + requestTimeout.cancel(false); + requestTimeout = null; + } + + blocked.remove(this); + + request.onFailure(cause); + } + + @Override + public void onSuccess() { + if (requestTimeout != null) { + requestTimeout.cancel(false); + requestTimeout = null; + } + + blocked.remove(this); + + request.onSuccess(); + } + + @Override + public boolean isComplete() { + return request.isComplete(); + } } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index 4cb1028..5520ec4 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -768,39 +768,57 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP switch (protonEvent.getType()) { case CONNECTION_REMOTE_CLOSE: amqpEventSink = (AmqpEventSink) protonEvent.getConnection().getContext(); - amqpEventSink.processRemoteClose(this); + if (amqpEventSink != null) { + amqpEventSink.processRemoteClose(this); + } break; case CONNECTION_REMOTE_OPEN: amqpEventSink = (AmqpEventSink) protonEvent.getConnection().getContext(); - amqpEventSink.processRemoteOpen(this); + if (amqpEventSink != null) { + amqpEventSink.processRemoteOpen(this); + } break; case SESSION_REMOTE_CLOSE: amqpEventSink = (AmqpEventSink) protonEvent.getSession().getContext(); - amqpEventSink.processRemoteClose(this); + if (amqpEventSink != null) { + amqpEventSink.processRemoteClose(this); + } break; case SESSION_REMOTE_OPEN: amqpEventSink = (AmqpEventSink) protonEvent.getSession().getContext(); - amqpEventSink.processRemoteOpen(this); + if (amqpEventSink != null) { + amqpEventSink.processRemoteOpen(this); + } break; case LINK_REMOTE_CLOSE: amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext(); - amqpEventSink.processRemoteClose(this); + if (amqpEventSink != null) { + amqpEventSink.processRemoteClose(this); + } break; case LINK_REMOTE_DETACH: amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext(); - amqpEventSink.processRemoteDetach(this); + if (amqpEventSink != null) { + amqpEventSink.processRemoteDetach(this); + } break; case LINK_REMOTE_OPEN: amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext(); - amqpEventSink.processRemoteOpen(this); + if (amqpEventSink != null) { + amqpEventSink.processRemoteOpen(this); + } break; case LINK_FLOW: amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext(); - amqpEventSink.processFlowUpdates(this); + if (amqpEventSink != null) { + amqpEventSink.processFlowUpdates(this); + } break; case DELIVERY: amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext(); - amqpEventSink.processDeliveryUpdates(this); + if (amqpEventSink != null) { + amqpEventSink.processDeliveryUpdates(this); + } break; default: break; @@ -1137,13 +1155,15 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP * * @param request * The request that should be marked as failed based on configuration. + * @param timeout + * The time to wait before marking the request as failed. * @param error * The error to use when failing the pending request. * * @return a {@link ScheduledFuture} that can be stored by the caller. */ - public ScheduledFuture<?> scheduleRequestTimeout(final AsyncResult request, final Exception error) { - if (getRequestTimeout() != JmsConnectionInfo.INFINITE) { + public ScheduledFuture<?> scheduleRequestTimeout(final AsyncResult request, long timeout, final Exception error) { + if (timeout != JmsConnectionInfo.INFINITE) { return serializer.schedule(new Runnable() { @Override @@ -1152,7 +1172,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP pumpToProtonTransport(); } - }, getRequestTimeout(), TimeUnit.MILLISECONDS); + }, timeout, TimeUnit.MILLISECONDS); } return null; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java index b06f686..f0ddf96 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java @@ -18,11 +18,14 @@ package org.apache.qpid.jms.provider.amqp; import java.io.IOException; import java.nio.BufferOverflowException; +import java.util.concurrent.ScheduledFuture; import javax.jms.IllegalStateException; import javax.jms.JMSException; import javax.jms.TransactionRolledBackException; +import org.apache.qpid.jms.JmsOperationTimedOutException; +import org.apache.qpid.jms.meta.JmsConnectionInfo; import org.apache.qpid.jms.meta.JmsSessionInfo; import org.apache.qpid.jms.meta.JmsTransactionId; import org.apache.qpid.jms.provider.AsyncResult; @@ -57,6 +60,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionI private Delivery pendingDelivery; private AsyncResult pendingRequest; + private ScheduledFuture<?> pendingTimeout; public AmqpTransactionCoordinator(JmsSessionInfo resourceInfo, Sender endpoint, AmqpResourceParent parent) { super(resourceInfo, endpoint, parent); @@ -94,6 +98,11 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionI pendingDelivery.settle(); pendingRequest = null; pendingDelivery = null; + + if (pendingTimeout != null) { + pendingTimeout.cancel(false); + pendingTimeout = null; + } } super.processDeliveryUpdates(provider); @@ -120,6 +129,8 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionI pendingDelivery.setContext(txId); pendingRequest = request; + scheduleTimeoutIfNeeded("Timed out waiting for declare of new TX."); + sendTxCommand(message); } @@ -154,6 +165,8 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionI pendingDelivery.setContext(txId); pendingRequest = request; + scheduleTimeoutIfNeeded("Timed out waiting for discharge of TX."); + sendTxCommand(message); } @@ -190,6 +203,13 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionI //----- Internal implementation ------------------------------------------// + private void scheduleTimeoutIfNeeded(String cause) { + AmqpProvider provider = getParent().getProvider(); + if (provider.getRequestTimeout() != JmsConnectionInfo.INFINITE) { + provider.scheduleRequestTimeout(pendingRequest, provider.getRequestTimeout(), new JmsOperationTimedOutException(cause)); + } + } + private void sendTxCommand(Message message) throws IOException { int encodedSize = 0; byte[] buffer = OUTBOUND_BUFFER; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java index 1f74682..71a4dd4 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java @@ -134,4 +134,9 @@ public class AmqpConnectionBuilder extends AmqpResourceBuilder<AmqpConnection, A protected boolean isClosePending() { return getResource().getProperties().isConnectionOpenFailed(); } + + @Override + protected long getRequestTimeout() { + return getParent().getProvider().getConnectTimeout(); + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java index c2353e8..229e61f 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java @@ -19,6 +19,8 @@ package org.apache.qpid.jms.provider.amqp.builders; import java.io.IOException; import java.util.concurrent.ScheduledFuture; +import org.apache.qpid.jms.JmsOperationTimedOutException; +import org.apache.qpid.jms.meta.JmsConnectionInfo; import org.apache.qpid.jms.meta.JmsResource; import org.apache.qpid.jms.provider.AsyncResult; import org.apache.qpid.jms.provider.amqp.AmqpEventSink; @@ -74,7 +76,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex // Create the resource object now resource = createResource(parent, resourceInfo, endpoint); - if (parent.getProvider().getRequestTimeout() > 0) { + if (getRequestTimeout() > JmsConnectionInfo.INFINITE) { // Attempt to schedule a cancellation of the pending open request, can return // null if there is no configured request timeout. @@ -87,9 +89,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex @Override public void onFailure(Throwable result) { - // We ignore the default error and attempt to coerce a more - // meaningful error from the endpoint. - handleClosed(parent.getProvider()); + handleClosed(parent.getProvider(), result); } @Override @@ -97,7 +97,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex return request.isComplete(); } - }, null); + }, getRequestTimeout(), new JmsOperationTimedOutException("Request to open resource " + getResource() + " timed out")); } } @@ -110,7 +110,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex @Override public void processRemoteClose(AmqpProvider provider) throws IOException { - handleClosed(provider); + handleClosed(provider, null); } @Override @@ -158,13 +158,15 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex } } - protected final void handleClosed(AmqpProvider provider) { + protected final void handleClosed(AmqpProvider provider, Throwable cause) { // If the resource being built is closed during the creation process // then this is always an error. - Exception openError; + Throwable openError; if (hasRemoteError()) { openError = AmqpSupport.convertToException(getEndpoint(), getEndpoint().getRemoteCondition()); + } else if (cause != null) { + openError = cause; } else { openError = getOpenAbortException(); } @@ -244,11 +246,24 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex * When aborting the open operation, and there isn't an error condition, * provided by the peer, the returned exception will be used instead. * A subclass may override this method to provide alternative behavior. + * + * @return an Exception to describes the open failure for this resource. */ protected Exception getOpenAbortException() { return new IOException("Open failed unexpectedly."); } + /** + * Returns the configured time before the open of the resource is considered + * to have failed. Subclasses can override this method to provide a value more + * appropriate to the resource being built. + * + * @return the configured timeout before the open of the resource fails. + */ + protected long getRequestTimeout() { + return getParent().getProvider().getRequestTimeout(); + } + //----- Public access methods for the managed resources ------------------// public ENDPOINT getEndpoint() { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java index 6dabcca..bf5bbcf 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java @@ -68,7 +68,7 @@ public class JmsConnectionTest { @Test(timeout=30000, expected=JMSException.class) public void testJmsConnectionThrowsJMSExceptionProviderStartFails() throws JMSException, IllegalStateException, IOException { provider.getConfiguration().setFailOnStart(true); - new JmsConnection("ID:TEST:1", provider, clientIdGenerator); + try (JmsConnection connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);) {} } @Test(timeout=30000) http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java index b9c44af..556ae64 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java @@ -34,6 +34,7 @@ import javax.jms.InvalidDestinationException; import javax.jms.JMSException; import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.jms.JmsOperationTimedOutException; import org.apache.qpid.jms.provider.ProviderRedirectedException; import org.apache.qpid.jms.provider.amqp.AmqpSupport; import org.apache.qpid.jms.test.QpidJmsTestCase; @@ -71,6 +72,24 @@ public class FailedConnectionsIntegrationTest extends QpidJmsTestCase { } @Test(timeout = 20000) + public void testConnectThrowsTimedOutExceptioWhenResponseNotSent() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + testPeer.expectSaslAnonymousConnect(true); + testPeer.expectClose(); + try { + establishAnonymousConnecton(testPeer, true, "jms.connectTimeout=500"); + fail("Should have thrown JmsOperationTimedOutException"); + } catch (JmsOperationTimedOutException jmsEx) { + // Expected + } catch (Exception ex) { + fail("Should have thrown JMSException: " + ex); + } + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) public void testConnectWithNotFoundErrorThrowsJMSEWhenInvalidContainerHintNotPresent() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { testPeer.rejectConnect(AmqpError.NOT_FOUND, "Virtual Host does not exist", null); @@ -158,8 +177,16 @@ public class FailedConnectionsIntegrationTest extends QpidJmsTestCase { } Connection establishAnonymousConnecton(TestAmqpPeer testPeer, boolean setClientId) throws JMSException { + return establishAnonymousConnecton(testPeer, setClientId, null); + } + + Connection establishAnonymousConnecton(TestAmqpPeer testPeer, boolean setClientId, String connectionQuery) throws JMSException { - final String remoteURI = "amqp://localhost:" + testPeer.getServerPort(); + String remoteURI = "amqp://localhost:" + testPeer.getServerPort(); + + if (connectionQuery != null && !connectionQuery.isEmpty()) { + remoteURI += "?" + connectionQuery; + } ConnectionFactory factory = new JmsConnectionFactory(remoteURI); Connection connection = factory.createConnection(); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java index 2dd723b..de1e6b3 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java @@ -49,6 +49,7 @@ import javax.jms.TextMessage; import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.jms.JmsSendTimedOutException; import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport; import org.apache.qpid.jms.test.QpidJmsTestCase; import org.apache.qpid.jms.test.Wait; @@ -88,7 +89,10 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { MessageProducer producer = session.createProducer(queue); testPeer.expectDetach(true, true, true); + testPeer.expectClose(); + producer.close(); + connection.close(); testPeer.waitForAllHandlersToComplete(1000); } @@ -116,6 +120,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { messageMatcher.setPropertiesMatcher(propsMatcher); messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text)); testPeer.expectTransfer(messageMatcher); + testPeer.expectClose(); TextMessage message = session.createTextMessage(text); producer.send(message); @@ -124,6 +129,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { message.setText(text + text); assertEquals(text + text, message.getText()); + connection.close(); + testPeer.waitForAllHandlersToComplete(1000); } } @@ -147,12 +154,15 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { messageMatcher.setHeadersMatcher(headersMatcher); messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); testPeer.expectTransfer(messageMatcher); + testPeer.expectClose(); Message message = session.createTextMessage(); producer.send(message); assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode()); + connection.close(); + testPeer.waitForAllHandlersToComplete(1000); } } @@ -178,6 +188,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { messageMatcher.setHeadersMatcher(headersMatcher); messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); testPeer.expectTransfer(messageMatcher); + testPeer.expectClose(); Message message = session.createTextMessage(); message.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); @@ -187,6 +198,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode()); + connection.close(); + testPeer.waitForAllHandlersToComplete(1000); } } @@ -219,6 +232,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { messageMatcher.setPropertiesMatcher(propsMatcher); messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text)); testPeer.expectTransfer(messageMatcher); + testPeer.expectClose(); Message message = session.createTextMessage(text); @@ -228,6 +242,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { assertEquals("Should have had JMSDestination set", queue, message.getJMSDestination()); + connection.close(); + testPeer.waitForAllHandlersToComplete(1000); } } @@ -263,11 +279,14 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { messageMatcher.setPropertiesMatcher(propsMatcher); messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text)); testPeer.expectTransfer(messageMatcher); + testPeer.expectClose(); Message message = session.createTextMessage(text); producer.send(message); + connection.close(); + testPeer.waitForAllHandlersToComplete(1000); } } @@ -301,14 +320,17 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { messageMatcher.setPropertiesMatcher(propsMatcher); messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text)); testPeer.expectTransfer(messageMatcher); + testPeer.expectClose(); Message message = session.createTextMessage(text); assertEquals("JMSTimestamp should not yet be set", 0, message.getJMSTimestamp()); producer.setDisableMessageTimestamp(true); - producer.send(message); + + connection.close(); + testPeer.waitForAllHandlersToComplete(1000); assertEquals("JMSTimestamp should still not be set", 0, message.getJMSTimestamp()); @@ -350,11 +372,14 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { messageMatcher.setPropertiesMatcher(propsMatcher); messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text)); testPeer.expectTransfer(messageMatcher); + testPeer.expectClose(); Message message = session.createTextMessage(text); producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, ttl); + connection.close(); + testPeer.waitForAllHandlersToComplete(1000); } } @@ -406,12 +431,15 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { messageMatcher.setPropertiesMatcher(propsMatcher); messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text)); testPeer.expectTransfer(messageMatcher); + testPeer.expectClose(); Message message = session.createTextMessage(text); message.setLongProperty(AmqpMessageSupport.JMS_AMQP_TTL, amqpTtl); producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, jmsTtl); + connection.close(); + testPeer.waitForAllHandlersToComplete(2000); } } @@ -441,6 +469,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { messageMatcher.setHeadersMatcher(headersMatcher); messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); testPeer.expectTransfer(messageMatcher); + testPeer.expectClose(); Message message = session.createTextMessage(); @@ -450,6 +479,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { assertEquals(Message.DEFAULT_PRIORITY, message.getJMSPriority()); + connection.close(); + testPeer.waitForAllHandlersToComplete(1000); } } @@ -481,6 +512,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { messageMatcher.setHeadersMatcher(headersMatcher); messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); testPeer.expectTransfer(messageMatcher); + testPeer.expectClose(); Message message = session.createTextMessage(); @@ -490,6 +522,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { assertEquals(priority, message.getJMSPriority()); + connection.close(); + testPeer.waitForAllHandlersToComplete(1000); } } @@ -522,6 +556,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { messageMatcher.setPropertiesMatcher(propsMatcher); messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text)); testPeer.expectTransfer(messageMatcher); + testPeer.expectClose(); Message message = session.createTextMessage(text); @@ -533,7 +568,9 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { assertNotNull("JMSMessageID should be set", jmsMessageID); assertTrue("JMS 'ID:' prefix not found", jmsMessageID.startsWith("ID:")); - //Get the value that was actually transmitted/received, verify it is a string, compare to what we have locally + connection.close(); + + // Get the value that was actually transmitted/received, verify it is a string, compare to what we have locally testPeer.waitForAllHandlersToComplete(1000); Object receivedMessageId = propsMatcher.getReceivedMessageId(); @@ -571,6 +608,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { messageMatcher.setPropertiesMatcher(propsMatcher); messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text)); testPeer.expectTransfer(messageMatcher); + testPeer.expectClose(); Message message = session.createTextMessage(text); @@ -582,7 +620,9 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { assertNotNull("JMSMessageID should be set", jmsMessageID); assertTrue("JMS 'ID:' prefix not found", jmsMessageID.startsWith("ID:")); - //Get the value that was actually transmitted/received, verify it is a String, compare to what we have locally + connection.close(); + + // Get the value that was actually transmitted/received, verify it is a String, compare to what we have locally testPeer.waitForAllHandlersToComplete(1000); Object receivedMessageId = propsMatcher.getReceivedMessageId(); @@ -621,6 +661,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { messageMatcher.setPropertiesMatcher(propsMatcher); messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text)); testPeer.expectTransfer(messageMatcher); + testPeer.expectClose(); Message message = session.createTextMessage(text); @@ -632,7 +673,9 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { assertNotNull("JMSMessageID should be set", jmsMessageID); assertTrue("JMS 'ID:' prefix not found", jmsMessageID.startsWith("ID:")); - //Get the value that was actually transmitted/received, verify it is a UUID, compare to what we have locally + connection.close(); + + // Get the value that was actually transmitted/received, verify it is a UUID, compare to what we have locally testPeer.waitForAllHandlersToComplete(1000); Object receivedMessageId = propsMatcher.getReceivedMessageId(); @@ -687,6 +730,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { messageMatcher.setPropertiesMatcher(propsMatcher); messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text)); testPeer.expectTransfer(messageMatcher); + testPeer.expectClose(); Message message = session.createTextMessage(text); @@ -704,6 +748,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { assertNull("JMSMessageID should be null", message.getJMSMessageID()); + connection.close(); + testPeer.waitForAllHandlersToComplete(2000); } } @@ -778,6 +824,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { // response, simply remotely close the producer instead. testPeer.expectTransfer(messageMatcher, nullValue(), false, false, null, false); testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_LIMIT_EXCEEDED, BREAD_CRUMB); + testPeer.expectClose(); Queue queue = session.createQueue("myQueue"); final MessageProducer producer = session.createProducer(queue); @@ -793,6 +840,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { assertTrue("Expected breadcrumb to be present in message", jmse.getMessage().contains(BREAD_CRUMB)); } + connection.close(); + testPeer.waitForAllHandlersToComplete(3000); } } @@ -860,10 +909,89 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { testPeer.expectSenderAttach(100); testPeer.expectTransfer(messageMatcher); + testPeer.expectClose(); MessageProducer producer = session.createProducer(queue); producer.send(message); + + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) + public void testSendWhenLinkCreditIsZeroAndTimeout() throws Exception { + try(TestAmqpPeer testPeer = new TestAmqpPeer();) { + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + connection.setSendTimeout(500); + + testPeer.expectBegin(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String queueName = "myQueue"; + Queue queue = session.createQueue(queueName); + + Message message = session.createTextMessage("text"); + + // Expect the producer to attach. Don't send any credit so that the client will + // block on a send and we can test our timeouts. + testPeer.expectSenderAttachWithoutGrantingCredit(); + testPeer.expectClose(); + + MessageProducer producer = session.createProducer(queue); + + try { + producer.send(message); + fail("Send should time out."); + } catch (JmsSendTimedOutException jmsEx) { + LOG.info("Caught expected error: {}", jmsEx.getMessage()); + } catch (Throwable error) { + fail("Send should time out, but got: " + error.getMessage()); + } + + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) + public void testSendTimesOutWhenNoDispostionArrives() throws Exception { + try(TestAmqpPeer testPeer = new TestAmqpPeer();) { + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + connection.setSendTimeout(500); + + testPeer.expectBegin(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String queueName = "myQueue"; + Queue queue = session.createQueue(queueName); + + Message message = session.createTextMessage("text"); + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + + // Expect the producer to attach and grant it some credit, it should send + // a transfer which we will not send any response for which should cause the + // send operation to time out. + testPeer.expectSenderAttach(100); + testPeer.expectTransferButDoNotRespond(messageMatcher); + testPeer.expectClose(); + + MessageProducer producer = session.createProducer(queue); + + try { + producer.send(message); + fail("Send should time out."); + } catch (JmsSendTimedOutException jmsEx) { + LOG.info("Caught expected error: {}", jmsEx.getMessage()); + } catch (Throwable error) { + fail("Send should time out, but got: " + error.getMessage()); + } + + connection.close(); + testPeer.waitForAllHandlersToComplete(1000); } } @@ -915,6 +1043,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { Message message = session.createTextMessage("content"); testPeer.expectTransfer(new TransferPayloadCompositeMatcher(), nullValue(), false, responseState, true); + testPeer.expectClose(); assertNull("Should not yet have a JMSDestination", message.getJMSDestination()); @@ -926,6 +1055,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { // Expected } + connection.close(); + testPeer.waitForAllHandlersToComplete(2000); } } @@ -967,13 +1098,12 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { private void doAsyncSendMessageNotAcceptedTestImpl(ListDescribedType responseState) throws JMSException, InterruptedException, Exception, IOException { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { - Connection connection = testFixture.establishConnecton(testPeer); + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); final CountDownLatch asyncError = new CountDownLatch(1); - JmsConnection jmsConnection = (JmsConnection) connection; - jmsConnection.setForceAsyncSend(true); - jmsConnection.setExceptionListener(new ExceptionListener() { + connection.setForceAsyncSend(true); + connection.setExceptionListener(new ExceptionListener() { @Override public void onException(JMSException exception) { @@ -1011,6 +1141,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { assertTrue("Should get a non-fatal error", asyncError.await(10, TimeUnit.SECONDS)); testPeer.expectTransfer(new TransferPayloadCompositeMatcher()); + testPeer.expectClose(); try { producer.send(message); @@ -1019,6 +1150,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { fail("No expected exception for this send."); } + connection.close(); + testPeer.waitForAllHandlersToComplete(2000); } } @@ -1078,7 +1211,10 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { producer.send(session.createMessage()); testPeer.expectDetach(true, true, true); + testPeer.expectClose(); + producer.close(); + connection.close(); testPeer.waitForAllHandlersToComplete(1000); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java index b13fa00..b041f51 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java @@ -146,11 +146,10 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase { final DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); try (TestAmqpPeer testPeer = new TestAmqpPeer();) { - Connection connection = testFixture.establishConnecton(testPeer); + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); connection.start(); - JmsConnection jmsConnection = (JmsConnection) connection; - jmsConnection.getPrefetchPolicy().setAll(1); + connection.getPrefetchPolicy().setAll(1); testPeer.expectBegin(); @@ -305,11 +304,10 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase { @Test(timeout=30000) public void testCreateQueueBrowserAndEnumerationZeroPrefetch() throws IOException, Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { - Connection connection = testFixture.establishConnecton(testPeer); + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); connection.start(); - JmsConnection jmsConnection = (JmsConnection) connection; - jmsConnection.getPrefetchPolicy().setAll(0); + connection.getPrefetchPolicy().setAll(0); testPeer.expectBegin(); @@ -333,11 +331,10 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase { @Test(timeout=30000) public void testQueueBrowserHasMoreElementsZeroPrefetchNoMessage() throws IOException, Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { - Connection connection = testFixture.establishConnecton(testPeer); + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); connection.start(); - JmsConnection jmsConnection = (JmsConnection) connection; - jmsConnection.getPrefetchPolicy().setAll(0); + connection.getPrefetchPolicy().setAll(0); testPeer.expectBegin(); @@ -365,11 +362,10 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase { DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); try (TestAmqpPeer testPeer = new TestAmqpPeer();) { - Connection connection = testFixture.establishConnecton(testPeer); + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); connection.start(); - JmsConnection jmsConnection = (JmsConnection) connection; - jmsConnection.getPrefetchPolicy().setAll(0); + connection.getPrefetchPolicy().setAll(0); testPeer.expectBegin(); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java index 415ffb1..7275cac 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java @@ -56,6 +56,7 @@ import javax.jms.Topic; import javax.jms.TopicSubscriber; import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.JmsOperationTimedOutException; import org.apache.qpid.jms.JmsPrefetchPolicy; import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper; import org.apache.qpid.jms.test.QpidJmsTestCase; @@ -243,7 +244,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { // even though there is no detach response. session.createConsumer(dest); fail("Consumer creation should have failed when link was refused"); - } catch(JMSException ex) { + } catch(JmsOperationTimedOutException ex) { // Expected LOG.info("Caught expected error on consumer create: {}", ex.getMessage()); } @@ -276,7 +277,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { QueueBrowser browser = session.createBrowser(dest); browser.getEnumeration(); fail("Consumer creation should have failed when link was refused"); - } catch(JMSException ex) { + } catch(JmsOperationTimedOutException ex) { // Expected LOG.info("Caught expected error on browser create: {}", ex.getMessage()); } @@ -995,7 +996,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { // Create a producer, expect it to throw exception due to the link-refusal session.createProducer(dest); fail("Producer creation should have failed when link was refused"); - } catch(JMSException ex) { + } catch(JmsOperationTimedOutException ex) { // Expected LOG.info("Caught expected exception on create: {}", ex.getMessage()); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java index 2ef7046..dfd2f1c 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java @@ -39,6 +39,7 @@ import javax.jms.TextMessage; import javax.jms.TransactionRolledBackException; import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.JmsOperationTimedOutException; import org.apache.qpid.jms.JmsPrefetchPolicy; import org.apache.qpid.jms.test.QpidJmsTestCase; import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; @@ -146,6 +147,8 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase { // reply with a declared disposition state containing the txnId. txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); testPeer.expectDeclare(txnId); + testPeer.expectDischarge(txnId, true); + testPeer.expectClose(); try { session.commit(); @@ -153,6 +156,8 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase { } catch (TransactionRolledBackException jmsTxRb) { } + connection.close(); + testPeer.waitForAllHandlersToComplete(1000); } } @@ -225,9 +230,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase { txState.setOutcome(new Accepted()); testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true); + testPeer.expectDischarge(txnId, true); + testPeer.expectClose(); producer.send(session.createMessage()); + connection.close(); + testPeer.waitForAllHandlersToComplete(1000); } } @@ -300,9 +309,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase { txState.setOutcome(new Accepted()); testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true); + testPeer.expectDischarge(txnId, true); + testPeer.expectClose(); producer.send(session.createMessage()); + connection.close(); + testPeer.waitForAllHandlersToComplete(1000); } } @@ -410,6 +423,11 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase { messageConsumer.close(); } + testPeer.expectDischarge(txnId, true); + testPeer.expectClose(); + + connection.close(); + testPeer.waitForAllHandlersToComplete(1000); } } @@ -450,9 +468,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase { // reply with a declared disposition state containing the txnId. txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); testPeer.expectDeclare(txnId); + testPeer.expectDischarge(txnId, true); + testPeer.expectClose(); session.commit(); + connection.close(); + testPeer.waitForAllHandlersToComplete(1000); } } @@ -493,9 +515,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase { // reply with a declared disposition state containing the txnId. txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); testPeer.expectDeclare(txnId); + testPeer.expectDischarge(txnId, true); + testPeer.expectClose(); session.rollback(); + connection.close(); + testPeer.waitForAllHandlersToComplete(1000); } } @@ -537,9 +563,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase { txState.setOutcome(new Accepted()); testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true); + testPeer.expectDischarge(txnId, true); + testPeer.expectClose(); producer.send(session.createMessage()); + connection.close(); + testPeer.waitForAllHandlersToComplete(1000); } } @@ -634,8 +664,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase { testPeer.expectLinkFlow(false, false, greaterThan(UnsignedInteger.ZERO)); } + testPeer.expectDischarge(txnId, true); + testPeer.expectClose(); + session.rollback(); + connection.close(); + testPeer.waitForAllHandlersToComplete(1000); } } @@ -705,8 +740,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase { // Expect the consumer to be 'started' again as rollback completes testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount))); + testPeer.expectDischarge(txnId, true); + testPeer.expectClose(); + session.rollback(); + connection.close(); + testPeer.waitForAllHandlersToComplete(1000); } } @@ -780,9 +820,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase { // Expect the consumer to be 'started' again as rollback completes testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount))); + testPeer.expectDischarge(txnId, true); + testPeer.expectClose(); session.rollback(); + connection.close(); + testPeer.waitForAllHandlersToComplete(1000); } } @@ -814,9 +858,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase { testPeer.expectReceiverAttach(notNullValue(), sourceMatcher); testPeer.expectLinkFlow(); + testPeer.expectDischarge(txnId, true); + testPeer.expectClose(); session.createConsumer(queue); + connection.close(); + testPeer.waitForAllHandlersToComplete(1000); } } @@ -835,6 +883,8 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase { testPeer.remotelyCloseLastCoordinatorLinkOnDischarge(txnId, false); testPeer.expectCoordinatorAttach(); testPeer.expectDeclare(txnId); + testPeer.expectDischarge(txnId, true); + testPeer.expectClose(); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); @@ -845,6 +895,8 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase { LOG.info("Caught expected TransactionRolledBackException"); } + connection.close(); + testPeer.waitForAllHandlersToComplete(1000); } } @@ -863,6 +915,8 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase { testPeer.remotelyCloseLastCoordinatorLinkOnDischarge(txnId, true); testPeer.expectCoordinatorAttach(); testPeer.expectDeclare(txnId); + testPeer.expectDischarge(txnId, true); + testPeer.expectClose(); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); @@ -873,6 +927,8 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase { LOG.info("Caught expected JMSException"); } + connection.close(); + testPeer.waitForAllHandlersToComplete(1000); } } @@ -911,12 +967,18 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase { testPeer.expectCoordinatorAttach(); testPeer.expectDeclare(txnId); + // Expect that the session TX will rollback on close. + testPeer.expectDischarge(txnId, true); + testPeer.expectClose(); + try { session.commit(); fail("Commit operation should have failed."); } catch (TransactionRolledBackException jmsTxRb) { } + connection.close(); + testPeer.waitForAllHandlersToComplete(1000); } } @@ -960,12 +1022,192 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase { testPeer.expectCoordinatorAttach(); testPeer.expectDeclare(txnId); + // Expect that the session TX will rollback on close. + testPeer.expectDischarge(txnId, true); + testPeer.expectClose(); + try { session.commit(); fail("Commit operation should have failed."); } catch (TransactionRolledBackException jmsTxRb) { } + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout=20000) + public void testSessionCreateFailsOnDeclareTimeout() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + connection.setRequestTimeout(500); + connection.start(); + + testPeer.expectBegin(); + testPeer.expectCoordinatorAttach(); + testPeer.expectDeclareButDoNotRespond(); + testPeer.expectClose(); + + try { + connection.createSession(true, Session.SESSION_TRANSACTED); + fail("Should have timed out waiting for declare."); + } catch (JmsOperationTimedOutException jmsEx) { + } catch (Throwable error) { + fail("Should have caught an timed out exception:"); + LOG.error("Caught -> ", error); + } + + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout=20000) + public void testTransactionRolledBackOnSessionCloseTimesOut() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + connection.setRequestTimeout(500); + connection.start(); + + testPeer.expectBegin(); + testPeer.expectCoordinatorAttach(); + + Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + testPeer.expectDeclare(txnId); + + // Closed session should roll-back the TX with a failed discharge + testPeer.expectDischargeButDoNotRespond(txnId, true); + testPeer.expectClose(); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + try { + session.close(); + fail("Should have timed out waiting for declare."); + } catch (JmsOperationTimedOutException jmsEx) { + } catch (Throwable error) { + fail("Should have caught an timed out exception:"); + LOG.error("Caught -> ", error); + } + + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout=20000) + public void testTransactionRolledBackTimesOut() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + connection.setRequestTimeout(500); + connection.start(); + + testPeer.expectBegin(); + testPeer.expectCoordinatorAttach(); + + Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + testPeer.expectDeclare(txnId); + + // Closed session should roll-back the TX with a failed discharge + testPeer.expectDischargeButDoNotRespond(txnId, true); + + // Session should throw from the rollback and then try and recover. + testPeer.expectDeclare(txnId); + testPeer.expectDischarge(txnId, true); + testPeer.expectClose(); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + try { + session.rollback(); + fail("Should have timed out waiting for declare."); + } catch (JmsOperationTimedOutException jmsEx) { + } catch (Throwable error) { + fail("Should have caught an timed out exception:"); + LOG.error("Caught -> ", error); + } + + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout=20000) + public void testTransactionCommitTimesOut() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + connection.setRequestTimeout(500); + connection.start(); + + testPeer.expectBegin(); + testPeer.expectCoordinatorAttach(); + + Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + testPeer.expectDeclare(txnId); + + // Closed session should roll-back the TX with a failed discharge + testPeer.expectDischargeButDoNotRespond(txnId, false); + + // Session should throw from the commit and then try and recover. + testPeer.expectDeclare(txnId); + testPeer.expectDischarge(txnId, true); + testPeer.expectClose(); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + try { + session.commit(); + fail("Should have timed out waiting for declare."); + } catch (JmsOperationTimedOutException jmsEx) { + } catch (Throwable error) { + fail("Should have caught an timed out exception:"); + LOG.error("Caught -> ", error); + } + + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout=20000) + public void testTransactionCommitTimesOutAndNoNextBeginTimesOut() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + connection.setRequestTimeout(500); + connection.start(); + + testPeer.expectBegin(); + testPeer.expectCoordinatorAttach(); + + Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + testPeer.expectDeclare(txnId); + + // Closed session should roll-back the TX with a failed discharge + testPeer.expectDischargeButDoNotRespond(txnId, false); + + // Session should throw from the commit and then try and recover. + testPeer.expectDeclareButDoNotRespond(); + testPeer.expectClose(); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + try { + session.commit(); + fail("Should have timed out waiting for declare."); + } catch (JmsOperationTimedOutException jmsEx) { + } catch (Throwable error) { + fail("Should have caught an timed out exception:"); + LOG.error("Caught -> ", error); + } + + connection.close(); + testPeer.waitForAllHandlersToComplete(1000); } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java index 6b6dcb5..64af655 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java @@ -868,7 +868,7 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { testPeer.dropAfterLastHandler(); final JmsConnection connection = establishAnonymousConnecton( - "jms.requestTimeout=2000&failover.reconnectDelay=2000&failover.maxReconnectAttempts=60", + "jms.requestTimeout=1000&failover.reconnectDelay=2000&failover.maxReconnectAttempts=60", testPeer); connection.addConnectionListener(new JmsDefaultConnectionListener() { @@ -925,7 +925,7 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { testPeer.dropAfterLastHandler(); final JmsConnection connection = establishAnonymousConnecton( - "jms.sendTimeout=2000&failover.reconnectDelay=2000&failover.maxReconnectAttempts=60", + "jms.sendTimeout=1000&failover.reconnectDelay=2000&failover.maxReconnectAttempts=60", testPeer); connection.addConnectionListener(new JmsDefaultConnectionListener() { @@ -986,7 +986,7 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { testPeer.dropAfterLastHandler(); final JmsConnection connection = establishAnonymousConnecton( - "jms.requestTimeout=2000&failover.reconnectDelay=2000&failover.maxReconnectAttempts=60", + "jms.requestTimeout=1000&failover.reconnectDelay=2000&failover.maxReconnectAttempts=60", testPeer); connection.addConnectionListener(new JmsDefaultConnectionListener() { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e0b5980c/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java index d4b304e..7ffdb2e 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java @@ -20,11 +20,11 @@ package org.apache.qpid.jms.test.testpeer; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DYNAMIC_NODE_LIFETIME_POLICY; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.arrayContaining; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -419,7 +419,8 @@ public class TestAmqpPeer implements AutoCloseable } public void expectSaslConnect(Symbol mechanism, Matcher<Binary> initialResponseMatcher, Symbol[] desiredCapabilities, Symbol[] serverCapabilities, - Matcher<?> clientPropertiesMatcher, Map<Symbol, Object> serverProperties, Matcher<?> idleTimeoutMatcher, Matcher<?> hostnameMatcher) + Matcher<?> clientPropertiesMatcher, Map<Symbol, Object> serverProperties, Matcher<?> idleTimeoutMatcher, + Matcher<?> hostnameMatcher, boolean deferOpened) { SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(mechanism); addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER, @@ -467,12 +468,10 @@ public class TestAmqpPeer implements AutoCloseable open.setProperties(serverProperties); } - OpenMatcher openMatcher = new OpenMatcher() - .withContainerId(notNullValue(String.class)) - .onCompletion(new FrameSender( - this, FrameType.AMQP, 0, - open, - null)); + OpenMatcher openMatcher = new OpenMatcher().withContainerId(notNullValue(String.class)); + if (!deferOpened) { + openMatcher.onCompletion(new FrameSender(this, FrameType.AMQP, 0, open, null)); + } if (desiredCapabilities != null) { @@ -515,7 +514,7 @@ public class TestAmqpPeer implements AutoCloseable Matcher<Binary> initialResponseMatcher = equalTo(new Binary(data)); - expectSaslConnect(PLAIN, initialResponseMatcher, desiredCapabilities, serverCapabilities, null, serverProperties, null, null); + expectSaslConnect(PLAIN, initialResponseMatcher, desiredCapabilities, serverCapabilities, null, serverProperties, null, null, false); } public void expectSaslExternalConnect() @@ -525,7 +524,7 @@ public class TestAmqpPeer implements AutoCloseable throw new IllegalStateException("need-client-cert must be enabled on the test peer"); } - expectSaslConnect(EXTERNAL, equalTo(new Binary(new byte[0])), new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY }, null, null, null, null, null); + expectSaslConnect(EXTERNAL, equalTo(new Binary(new byte[0])), new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY }, null, null, null, null, null, false); } public void expectSaslAnonymousConnect() @@ -533,6 +532,11 @@ public class TestAmqpPeer implements AutoCloseable expectSaslAnonymousConnect(null, null); } + public void expectSaslAnonymousConnect(boolean deferOpened) + { + expectSaslConnect(ANONYMOUS, equalTo(new Binary(new byte[0])), new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY }, null, null, null, null, null, deferOpened); + } + public void expectSaslAnonymousConnect(Matcher<?> idleTimeoutMatcher, Matcher<?> hostnameMatcher) { expectSaslAnonymousConnect(idleTimeoutMatcher, hostnameMatcher, null, null); @@ -540,7 +544,7 @@ public class TestAmqpPeer implements AutoCloseable public void expectSaslAnonymousConnect(Matcher<?> idleTimeoutMatcher, Matcher<?> hostnameMatcher, Matcher<?> propertiesMatcher, Map<Symbol, Object> serverProperties) { - expectSaslConnect(ANONYMOUS, equalTo(new Binary(new byte[0])), new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY }, null, propertiesMatcher, serverProperties, idleTimeoutMatcher, hostnameMatcher); + expectSaslConnect(ANONYMOUS, equalTo(new Binary(new byte[0])), new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY }, null, propertiesMatcher, serverProperties, idleTimeoutMatcher, hostnameMatcher, false); } public void expectFailingSaslConnect(Symbol[] serverMechs, Symbol clientSelectedMech) @@ -844,9 +848,14 @@ public class TestAmqpPeer implements AutoCloseable expectSenderAttach(notNullValue(), false, false); } + public void expectSenderAttachWithoutGrantingCredit() + { + expectSenderAttach(notNullValue(), notNullValue(), false, false, false, 0, 0, null, null); + } + public void expectSenderAttach(long creditFlowDelay) { - expectSenderAttach(notNullValue(), notNullValue(), false, false, false, creditFlowDelay, null, null); + expectSenderAttach(notNullValue(), notNullValue(), false, false, false, creditFlowDelay, 100, null, null); } public void expectSenderAttach(final Matcher<?> targetMatcher, final boolean refuseLink, boolean deferAttachResponseWrite) @@ -861,6 +870,11 @@ public class TestAmqpPeer implements AutoCloseable public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, long creditFlowDelay, Symbol errorType, String errorMessage) { + expectSenderAttach(sourceMatcher, targetMatcher, refuseLink, omitDetach, deferAttachResponseWrite, creditFlowDelay, 100, errorType, errorMessage); + } + + public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, long creditFlowDelay, int creditAmount, Symbol errorType, String errorMessage) + { final AttachMatcher attachMatcher = new AttachMatcher() .withName(notNullValue()) .withHandle(notNullValue()) @@ -942,7 +956,7 @@ public class TestAmqpPeer implements AutoCloseable .setIncomingWindow(UnsignedInteger.valueOf(2048)) .setNextOutgoingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded .setOutgoingWindow(UnsignedInteger.valueOf(2048)) - .setLinkCredit(UnsignedInteger.valueOf(100)); + .setLinkCredit(UnsignedInteger.valueOf(creditAmount)); // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. final FrameSender flowFrameSender = new FrameSender(this, FrameType.AMQP, -1, flowFrame, null); @@ -1476,6 +1490,11 @@ public class TestAmqpPeer implements AutoCloseable return payloadData.encode(); } + public void expectTransferButDoNotRespond(Matcher<Binary> expectedPayloadMatcher) + { + expectTransfer(expectedPayloadMatcher, nullValue(), false, false, null, false); + } + public void expectTransfer(Matcher<Binary> expectedPayloadMatcher) { expectTransfer(expectedPayloadMatcher, nullValue(), false, true, new Accepted(), true); @@ -1538,6 +1557,14 @@ public class TestAmqpPeer implements AutoCloseable expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); } + public void expectDeclareButDoNotRespond() + { + TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + + expectTransfer(declareMatcher, nullValue(), false, false, null, false); + } + public void expectDischarge(Binary txnId, boolean dischargeState) { expectDischarge(txnId, dischargeState, new Accepted()); } @@ -1555,6 +1582,19 @@ public class TestAmqpPeer implements AutoCloseable expectTransfer(dischargeMatcher, nullValue(), false, responseState, true); } + public void expectDischargeButDoNotRespond(Binary txnId, boolean dischargeState) { + // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, + // and reply with given response and settled disposition to indicate the outcome. + Discharge discharge = new Discharge(); + discharge.setFail(dischargeState); + discharge.setTxnId(txnId); + + TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher(); + dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge)); + + expectTransfer(dischargeMatcher, nullValue(), false, false, null, true); + } + public void remotelyCloseLastCoordinatorLink() { remotelyCloseLastCoordinatorLink(true, true, TransactionError.TRANSACTION_ROLLBACK, "Discharge of TX failed."); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
