Repository: qpid-jms Updated Branches: refs/heads/master 470c87aeb -> 7188d9c76
Implement proper durable topic unsubscribe. Also ensures that for a durable subscriber we detach and not close when the MessageConsumer is closed. Tests will continue to work with ActiveMQ as it removes subscriptions on the first reattach attempt and does not currently do a remove on close. Some of the AmqpResource code is a bit messy now and needs to be refactored a bit after this change. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/7188d9c7 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/7188d9c7 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/7188d9c7 Branch: refs/heads/master Commit: 7188d9c76d12f99e0a49ffa56d13e6cbcbc9c131 Parents: 470c87a Author: Timothy Bish <[email protected]> Authored: Wed Oct 29 16:51:31 2014 -0400 Committer: Timothy Bish <[email protected]> Committed: Wed Oct 29 16:51:31 2014 -0400 ---------------------------------------------------------------------- .../jms/provider/amqp/AmqpAbstractResource.java | 21 ++- .../provider/amqp/AmqpAnonymousProducer.java | 8 - .../qpid/jms/provider/amqp/AmqpConnection.java | 14 +- .../provider/amqp/AmqpConnectionSession.java | 147 +++---------------- .../qpid/jms/provider/amqp/AmqpConsumer.java | 6 + .../jms/provider/amqp/AmqpFixedProducer.java | 5 +- .../qpid/jms/provider/amqp/AmqpProvider.java | 8 + .../qpid/jms/provider/amqp/AmqpSession.java | 21 +++ .../provider/amqp/AmqpTemporaryDestination.java | 4 + .../provider/amqp/AmqpTransactionContext.java | 5 +- .../transactions/JmsTransactedConsumerTest.java | 24 +++ 11 files changed, 115 insertions(+), 148 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7188d9c7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java index 28e16b6..6def062 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java @@ -76,7 +76,6 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp this.openRequest = request; doOpen(); this.endpoint.setContext(this); - this.endpoint.open(); } @Override @@ -107,7 +106,6 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp this.closeRequest = request; doClose(); - this.endpoint.close(); } @Override @@ -254,8 +252,21 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp public void processFlowUpdates() throws IOException { } - protected abstract void doOpen(); - - protected abstract void doClose(); + /** + * Perform the open operation on the managed endpoint. A subclass may + * override this method to provide additional open actions or configuration + * updates. + */ + protected void doOpen() { + endpoint.open(); + } + /** + * Perform the close operation on the managed endpoint. A subclass may + * override this method to provide additional close actions or alter the + * standard close path such as endpoint detach etc. + */ + protected void doClose() { + endpoint.close(); + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7188d9c7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java index e7dceb0..69bfdf7 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java @@ -115,14 +115,6 @@ public class AmqpAnonymousProducer extends AmqpProducer { } @Override - protected void doOpen() { - } - - @Override - protected void doClose() { - } - - @Override public boolean isAnonymous() { return true; } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7188d9c7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java index 448556a..8d5a458 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java @@ -20,6 +20,7 @@ import java.net.URI; import java.util.HashMap; import java.util.Map; +import javax.jms.JMSException; import javax.jms.JMSSecurityException; import javax.jms.Session; @@ -95,10 +96,7 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn protected void doOpen() { this.endpoint.setContainer(resource.getClientId()); this.endpoint.setHostname(remoteURI.getHost()); - } - - @Override - protected void doClose() { + super.doOpen(); } public AmqpSession createSession(JmsSessionInfo sessionInfo) { @@ -112,6 +110,14 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn } public void unsubscribe(String subscriptionName, AsyncResult request) { + + for (AmqpSession session : sessions.values()) { + if (session.containsSubscription(subscriptionName)) { + request.onFailure(new JMSException("Cannot remove an active durable subscription")); + return; + } + } + connectionSession.unsubscribe(subscriptionName, request); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7188d9c7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java index eaddc17..3cfc0c6 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java @@ -16,17 +16,16 @@ */ package org.apache.qpid.jms.provider.amqp; -import java.io.IOException; import java.util.HashMap; import java.util.Map; +import javax.jms.JMSException; + import org.apache.qpid.jms.meta.JmsSessionInfo; import org.apache.qpid.jms.provider.AsyncResult; +import org.apache.qpid.jms.provider.NoOpAsyncResult; import org.apache.qpid.jms.provider.WrappedAsyncResult; -import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.Target; -import org.apache.qpid.proton.amqp.messaging.TerminusDurability; -import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.Receiver; @@ -64,33 +63,28 @@ public class AmqpConnectionSession extends AmqpSession { * the request that awaits the completion of this action. */ public void unsubscribe(String subscriptionName, AsyncResult request) { - SubscriptionSourceRequestor requestor = new SubscriptionSourceRequestor(getJmsResource(), subscriptionName); - SubscriptionSourceRequest sourceRequest = new SubscriptionSourceRequest(requestor, request); - pendingUnsubs.put(subscriptionName, sourceRequest); + DurableSubscriptionReattach subscriber = new DurableSubscriptionReattach(getJmsResource(), subscriptionName); + DurableSubscriptionReattachRequest subscribeRequest = new DurableSubscriptionReattachRequest(subscriber, request); + pendingUnsubs.put(subscriptionName, subscribeRequest); LOG.debug("Attempting remove of subscription: {}", subscriptionName); - requestor.open(sourceRequest); + subscriber.open(subscribeRequest); } - private class SubscriptionSourceRequestor extends AmqpAbstractResource<JmsSessionInfo, Receiver> { - + private class DurableSubscriptionReattach extends AmqpAbstractResource<JmsSessionInfo, Receiver> { private final String subscriptionName; - public SubscriptionSourceRequestor(JmsSessionInfo resource, String subscriptionName) { - super(resource); + public DurableSubscriptionReattach(JmsSessionInfo resource, String subscriptionName) { + super(resource, AmqpConnectionSession.this.getProtonSession().receiver(subscriptionName)); this.subscriptionName = subscriptionName; } @Override protected void doOpen() { - endpoint = AmqpConnectionSession.this.getProtonSession().receiver(subscriptionName); endpoint.setTarget(new Target()); endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED); endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST); - } - - @Override - protected void doClose() { + super.doOpen(); } public String getSubscriptionName() { @@ -98,125 +92,32 @@ public class AmqpConnectionSession extends AmqpSession { } } - private class SubscriptionSourceRequest extends WrappedAsyncResult { + private class DurableSubscriptionReattachRequest extends WrappedAsyncResult { - private final SubscriptionSourceRequestor requestor; + private final DurableSubscriptionReattach subscriber; - public SubscriptionSourceRequest(SubscriptionSourceRequestor requestor, AsyncResult originalRequest) { + public DurableSubscriptionReattachRequest(DurableSubscriptionReattach subscriber, AsyncResult originalRequest) { super(originalRequest); - this.requestor = requestor; + this.subscriber = subscriber; } @Override public void onSuccess() { - Object result = requestor.getEndpoint().getRemoteSource(); - if (result == null || !(result instanceof Source)) { - LOG.trace("No Source returned for subscription: {}", requestor.getSubscriptionName()); - pendingUnsubs.remove(requestor.getSubscriptionName()); - requestor.closed(); - super.onFailure(new IOException("Could not fetch remote subscription information")); - } else { - final Source remoteSource = (Source) result; - LOG.trace("Source returned for subscription: {} closing first stage", requestor.getSubscriptionName()); - requestor.close(new AsyncResult() { - - @Override - public void onSuccess() { - RemoveDurabilityRequestor removeRequestor = - new RemoveDurabilityRequestor(getJmsResource(), requestor.getSubscriptionName(), remoteSource); - RemoveDurabilityRequest removeRequest = new RemoveDurabilityRequest(removeRequestor, getWrappedRequest()); - pendingUnsubs.put(requestor.getSubscriptionName(), removeRequest); - LOG.trace("Second stage remove started for subscription: {}", requestor.getSubscriptionName()); - removeRequestor.open(removeRequest); - } - - @Override - public void onFailure(Throwable result) { - LOG.trace("Second stage remove failed for subscription: {}", requestor.getSubscriptionName()); - pendingUnsubs.remove(requestor.getSubscriptionName()); - getWrappedRequest().onFailure(result); - } - - @Override - public boolean isComplete() { - return getWrappedRequest().isComplete(); - } - }); - } - } - - @Override - public void onFailure(Throwable result) { - pendingUnsubs.remove(requestor.getSubscriptionName()); - requestor.closed(); - super.onFailure(result); - } - } - - private class RemoveDurabilityRequestor extends AmqpAbstractResource<JmsSessionInfo, Receiver> { - - private final String subscriptionName; - private final Source subscriptionSource; - - public RemoveDurabilityRequestor(JmsSessionInfo resource, String subscriptionName, Source subscriptionSource) { - super(resource); - this.subscriptionSource = subscriptionSource; - this.subscriptionName = subscriptionName; - } - - @Override - protected void doOpen() { - endpoint = AmqpConnectionSession.this.getProtonSession().receiver(subscriptionName); - - subscriptionSource.setDurable(TerminusDurability.NONE); - subscriptionSource.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); - - endpoint.setSource(subscriptionSource); - endpoint.setTarget(new Target()); - endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED); - endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST); - } - - @Override - public void remotelyClosed() { - if (isAwaitingOpen()) { - openRequest.onSuccess(); + LOG.trace("Reattached to subscription: {}", subscriber.getSubscriptionName()); + pendingUnsubs.remove(subscriber.getSubscriptionName()); + if (subscriber.getEndpoint().getRemoteSource() != null) { + subscriber.close(getWrappedRequest()); } else { - closed(); - AmqpConnectionSession.this.reportError(new IOException("Durable unsubscribe failed unexpectedly")); + subscriber.close(NoOpAsyncResult.INSTANCE); + getWrappedRequest().onFailure(new JMSException("Cannot remove a subscription that does not exist")); } } @Override - protected void doClose() { - } - - public String getSubscriptionName() { - return subscriptionName; - } - } - - private class RemoveDurabilityRequest extends WrappedAsyncResult { - - private final RemoveDurabilityRequestor requestor; - - public RemoveDurabilityRequest(RemoveDurabilityRequestor requestor, AsyncResult originalRequest) { - super(originalRequest); - this.requestor = requestor; - } - - @Override - public void onSuccess() { - LOG.trace("Second stage remove complete for subscription: {}", requestor.getSubscriptionName()); - pendingUnsubs.remove(requestor.getSubscriptionName()); - requestor.close(getWrappedRequest()); - } - - @Override public void onFailure(Throwable result) { - LOG.trace("Second stage remove failed for subscription: {}", requestor.getSubscriptionName()); - pendingUnsubs.remove(requestor.getSubscriptionName()); - requestor.closed(); + LOG.trace("Failed to reattach to subscription: {}", subscriber.getSubscriptionName()); + pendingUnsubs.remove(subscriber.getSubscriptionName()); + subscriber.closed(); super.onFailure(result); } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7188d9c7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index 3caec3c..cb766bf 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -119,6 +119,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED); } endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST); + super.doOpen(); } @Override @@ -346,6 +347,11 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver @Override protected void doClose() { + if (resource.isDurable()) { + this.endpoint.detach(); + } else { + this.endpoint.close(); + } } public AmqpConnection getConnection() { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7188d9c7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java index 56ed79f..8174817 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java @@ -258,10 +258,7 @@ public class AmqpFixedProducer extends AmqpProducer { endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED); } endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST); - } - - @Override - protected void doClose() { + super.doOpen(); } public AmqpSession getSession() { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7188d9c7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index 200a367..c90c773 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -698,7 +698,15 @@ public class AmqpProvider extends AbstractProvider implements TransportListener session.processStateChange(); break; case LINK_REMOTE_CLOSE: + LOG.info("Link closed: {}", protonEvent.getLink().getContext()); + AmqpResource cloedResource = (AmqpResource) protonEvent.getLink().getContext(); + cloedResource.processStateChange(); + break; case LINK_REMOTE_DETACH: + LOG.info("Link detach: {}", protonEvent.getLink().getContext()); + AmqpResource detachedResource = (AmqpResource) protonEvent.getLink().getContext(); + detachedResource.processStateChange(); + break; case LINK_REMOTE_OPEN: AmqpResource resource = (AmqpResource) protonEvent.getLink().getContext(); resource.processStateChange(); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7188d9c7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java index 39c9f9d..ca28165 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java @@ -69,11 +69,13 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> { protected void doOpen() { this.endpoint.setIncomingCapacity(Integer.MAX_VALUE); this.connection.addSession(this); + super.doOpen(); } @Override protected void doClose() { this.connection.removeSession(this); + super.doClose(); } /** @@ -248,6 +250,25 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> { } /** + * Query the Session to see if there are any registered consumer instances that have + * a durable subscription with the given subscription name. + * + * @param subscriptionName + * the name of the subscription being searched for. + * + * @return true if there is a consumer that has the given subscription. + */ + public boolean containsSubscription(String subscriptionName) { + for (AmqpConsumer consumer : consumers.values()) { + if (subscriptionName.equals(consumer.getJmsResource().getSubscriptionName())) { + return true; + } + } + + return false; + } + + /** * Call to send an error that occurs outside of the normal asynchronous processing * of a session resource such as a remote close etc. * http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7188d9c7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java index 3c6893d..2cf1cb9 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java @@ -107,11 +107,15 @@ public class AmqpTemporaryDestination extends AmqpAbstractResource<JmsDestinatio endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST); this.connection.addTemporaryDestination(this); + + super.doOpen(); } @Override protected void doClose() { this.connection.removeTemporaryDestination(this); + + super.doClose(); } public AmqpConnection getConnection() { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7188d9c7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java index 6bfbaa5..5492976 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java @@ -140,10 +140,7 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo, endpoint.setTarget(coordinator); endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED); endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST); - } - - @Override - protected void doClose() { + super.doOpen(); } public void begin(JmsTransactionId txId, AsyncResult request) throws Exception { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7188d9c7/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java index 49121ad..cfe6328 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; @@ -71,6 +72,29 @@ public class JmsTransactedConsumerTest extends AmqpTestSupport { assertEquals(0, proxy.getQueueSize()); } + @Test(timeout=30000) + public void testRollbackRececeivedMessageAndClose() throws Exception { + + connection = createAmqpConnection(); + connection.start(); + + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName()); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("TestMessage-0")); + producer.close(); + session.commit(); + + MessageConsumer consumer = session.createConsumer(queue); + + Message msg = consumer.receive(2000); + assertNotNull(msg); + + session.rollback(); + + connection.close(); + } + @Test(timeout = 60000) public void testReceiveAndRollback() throws Exception { connection = createAmqpConnection(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
