Repository: qpid-jms Updated Branches: refs/heads/master f5b8f9fc2 -> 0fd981ca1
QPIDJMS-125 Fix an potential NPE when a TX Discharge fails with rejected that has a null error. Fix issue with auto-starting new TX after commit or rollback fails. Adds some tests to cover the above plus moves TX tests into their own JUnit test. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/0fd981ca Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/0fd981ca Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/0fd981ca Branch: refs/heads/master Commit: 0fd981ca1397607362c0c89920017fded813cec1 Parents: f5b8f9f Author: Timothy Bish <[email protected]> Authored: Tue Oct 13 17:06:59 2015 -0400 Committer: Timothy Bish <[email protected]> Committed: Tue Oct 13 17:06:59 2015 -0400 ---------------------------------------------------------------------- .../qpid/jms/JmsLocalTransactionContext.java | 2 + .../provider/amqp/AmqpTransactionContext.java | 4 +- .../jms/integration/SessionIntegrationTest.java | 412 ----------- .../TransactionsIntegrationTest.java | 720 +++++++++++++++++++ 4 files changed, 724 insertions(+), 414 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0fd981ca/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java index daee929..7d60fa7 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java @@ -183,6 +183,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext { @Override public void onPendingFailure(Throwable cause) { + reset(); } }); @@ -235,6 +236,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext { @Override public void onPendingFailure(Throwable cause) { + reset(); } }); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0fd981ca/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java index a5e9284..648b896 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java @@ -98,8 +98,8 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo, LOG.debug("Last TX request failed: {}", current.getProviderHint()); pendingDelivery.settle(); Rejected rejected = (Rejected) state; - TransactionRolledBackException ex = - new TransactionRolledBackException(rejected.getError().getDescription()); + Exception cause = AmqpSupport.convertToException(rejected.getError()); + TransactionRolledBackException ex = new TransactionRolledBackException(cause.getMessage()); AsyncResult request = this.pendingRequest; this.current = null; this.pendingRequest = null; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0fd981ca/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java index 4daec4b..9fc563f 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java @@ -21,7 +21,6 @@ package org.apache.qpid.jms.integration; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.ANONYMOUS_RELAY; import static org.hamcrest.Matchers.arrayContaining; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertEquals; @@ -66,17 +65,12 @@ import org.apache.qpid.jms.test.testpeer.basictypes.TerminusDurability; import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted; import org.apache.qpid.jms.test.testpeer.describedtypes.Declare; import org.apache.qpid.jms.test.testpeer.describedtypes.Declared; -import org.apache.qpid.jms.test.testpeer.describedtypes.Discharge; -import org.apache.qpid.jms.test.testpeer.describedtypes.Modified; import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected; -import org.apache.qpid.jms.test.testpeer.describedtypes.Released; -import org.apache.qpid.jms.test.testpeer.describedtypes.TransactionalState; import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType; import org.apache.qpid.jms.test.testpeer.describedtypes.sections.HeaderDescribedType; import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher; import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher; import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher; -import org.apache.qpid.jms.test.testpeer.matchers.ReleasedMatcher; import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher; import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher; import org.apache.qpid.jms.test.testpeer.matchers.TransactionalStateMatcher; @@ -952,78 +946,6 @@ public class SessionIntegrationTest extends QpidJmsTestCase { } @Test(timeout=20000) - public void testCommitTransactedSessionWithConsumerReceivingAllMessages() throws Exception { - doCommitTransactedSessionWithConsumerTestImpl(1, 1); - } - - @Test(timeout=20000) - public void testCommitTransactedSessionWithConsumerReceivingSomeMessages() throws Exception { - doCommitTransactedSessionWithConsumerTestImpl(5, 2); - } - - private void doCommitTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount) throws Exception { - try (TestAmqpPeer testPeer = new TestAmqpPeer();) { - Connection connection = testFixture.establishConnecton(testPeer); - connection.start(); - - testPeer.expectBegin(); - CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); - testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); - - // First expect an unsettled 'declare' transfer to the txn coordinator, and - // reply with a declared disposition state containing the txnId. - Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); - TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); - declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); - testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); - - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Queue queue = session.createQueue("myQueue"); - - testPeer.expectReceiverAttach(); - testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), transferCount); - - for (int i = 1; i <= consumeCount; i++) { - // Then expect an *settled* TransactionalState disposition for each message once received by the consumer - TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher(); - stateMatcher.withTxnId(equalTo(txnId)); - stateMatcher.withOutcome(new AcceptedMatcher()); - - testPeer.expectDisposition(true, stateMatcher); - } - - MessageConsumer messageConsumer = session.createConsumer(queue); - - for (int i = 1; i <= consumeCount; i++) { - Message receivedMessage = messageConsumer.receive(3000); - - assertNotNull(receivedMessage); - assertTrue(receivedMessage instanceof TextMessage); - } - - // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, - // and reply with accepted and settled disposition to indicate the commit succeeded - Discharge discharge = new Discharge(); - discharge.setFail(false); - discharge.setTxnId(txnId); - TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher(); - dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge)); - testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true); - - // Then expect an unsettled 'declare' transfer to the txn coordinator, and - // reply with a declared disposition state containing the txnId. - txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); - declareMatcher = new TransferPayloadCompositeMatcher(); - declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); - testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); - - session.commit(); - - testPeer.waitForAllHandlersToComplete(1000); - } - } - - @Test(timeout=20000) public void testIncomingMessageExceedsMaxRedeliveries() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { final int COUNT = 5; @@ -1070,340 +992,6 @@ public class SessionIntegrationTest extends QpidJmsTestCase { } } - @Test(timeout=20000) - public void testProducedMessagesOnTransactedSessionCarryTxnId() throws Exception { - try (TestAmqpPeer testPeer = new TestAmqpPeer();) { - Connection connection = testFixture.establishConnecton(testPeer); - connection.start(); - - testPeer.expectBegin(); - CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); - testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); - - // First expect an unsettled 'declare' transfer to the txn coordinator, and - // reply with a Declared disposition state containing the txnId. - Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); - TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); - declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); - testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); - - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Queue queue = session.createQueue("myQueue"); - - // Create a producer to use in provoking creation of the AMQP transaction - testPeer.expectSenderAttach(); - MessageProducer producer = session.createProducer(queue); - - // Expect the message which provoked creating the transaction. Check it carries - // TransactionalState with the above txnId but has no outcome. Respond with a - // TransactionalState with Accepted outcome. - TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); - messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); - messageMatcher.setMessageAnnotationsMatcher( new MessageAnnotationsSectionMatcher(true)); - - TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher(); - stateMatcher.withTxnId(equalTo(txnId)); - stateMatcher.withOutcome(nullValue()); - - TransactionalState txState = new TransactionalState(); - txState.setTxnId(txnId); - txState.setOutcome(new Accepted()); - - testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true); - - producer.send(session.createMessage()); - - testPeer.waitForAllHandlersToComplete(1000); - } - } - - @Test(timeout=20000) - public void testRollbackTransactedSessionWithConsumerReceivingAllMessages() throws Exception { - doRollbackTransactedSessionWithConsumerTestImpl(1, 1); - } - - @Test(timeout=20000) - public void testRollbackTransactedSessionWithConsumerReceivingSomeMessages() throws Exception { - doRollbackTransactedSessionWithConsumerTestImpl(5, 2); - } - - private void doRollbackTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount) throws Exception { - try (TestAmqpPeer testPeer = new TestAmqpPeer();) { - Connection connection = testFixture.establishConnecton(testPeer); - connection.start(); - - testPeer.expectBegin(); - CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); - testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); - - // First expect an unsettled 'declare' transfer to the txn coordinator, and - // reply with a declared disposition state containing the txnId. - Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); - TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); - declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); - testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); - - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Queue queue = session.createQueue("myQueue"); - - testPeer.expectReceiverAttach(); - testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), transferCount); - - for (int i = 1; i <= consumeCount; i++) { - // Then expect a *settled* TransactionalState disposition for each message once received by the consumer - TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher(); - stateMatcher.withTxnId(equalTo(txnId)); - stateMatcher.withOutcome(new AcceptedMatcher()); - - testPeer.expectDisposition(true, stateMatcher); - } - - MessageConsumer messageConsumer = session.createConsumer(queue); - - for (int i = 1; i <= consumeCount; i++) { - Message receivedMessage = messageConsumer.receive(3000); - - assertNotNull(receivedMessage); - assertTrue(receivedMessage instanceof TextMessage); - } - - // Expect the consumer to be 'stopped' prior to rollback by issuing a 'drain' - testPeer.expectLinkFlow(true, true, greaterThan(UnsignedInteger.ZERO)); - - // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, - // and reply with accepted and settled disposition to indicate the rollback succeeded - Discharge discharge = new Discharge(); - discharge.setFail(true); - discharge.setTxnId(txnId); - TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher(); - dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge)); - testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true); - - // Then expect an unsettled 'declare' transfer to the txn coordinator, and - // reply with a declared disposition state containing the txnId. - txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); - declareMatcher = new TransferPayloadCompositeMatcher(); - declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); - testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); - - // Expect the messages that were not consumed to be released - int unconsumed = transferCount - consumeCount; - for (int i = 1; i <= unconsumed; i++) { - testPeer.expectDisposition(true, new ReleasedMatcher()); - } - - // Expect the consumer to be 'started' again as rollback completes - testPeer.expectLinkFlow(false, false, greaterThan(UnsignedInteger.ZERO)); - - session.rollback(); - - testPeer.waitForAllHandlersToComplete(1000); - } - } - - @Test(timeout=20000) - public void testRollbackTransactedSessionWithPrefetchFullBeforeStoppingConsumer() throws Exception { - try (TestAmqpPeer testPeer = new TestAmqpPeer();) { - Connection connection = testFixture.establishConnecton(testPeer); - int messageCount = 5; - ((JmsConnection) connection).getPrefetchPolicy().setAll(messageCount); - connection.start(); - - testPeer.expectBegin(); - CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); - testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); - - // First expect an unsettled 'declare' transfer to the txn coordinator, and - // reply with a declared disposition state containing the txnId. - Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); - TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); - declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); - testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); - - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Queue queue = session.createQueue("myQueue"); - - // Create a consumer and fill the prefetch with messages, which we wont consume any of - testPeer.expectReceiverAttach(); - testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), messageCount); - - session.createConsumer(queue); - - // Create a producer to use in provoking creation of the AMQP transaction - testPeer.expectSenderAttach(); - MessageProducer producer = session.createProducer(queue); - - // Expect the message which provoked creating the transaction - TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); - messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); - messageMatcher.setMessageAnnotationsMatcher( new MessageAnnotationsSectionMatcher(true)); - TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher(); - stateMatcher.withTxnId(equalTo(txnId)); - stateMatcher.withOutcome(nullValue()); - - TransactionalState txState = new TransactionalState(); - txState.setTxnId(txnId); - txState.setOutcome(new Accepted()); - - testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true); - - producer.send(session.createMessage()); - - // The consumer will be 'stopped' prior to rollback, however we will NOT send a 'drain' Flow - // frame as we have manipulated that all the credit was already used, i.e. it already stopped. - - // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, - // and reply with accepted and settled disposition to indicate the rollback succeeded - Discharge discharge = new Discharge(); - discharge.setFail(true); - discharge.setTxnId(txnId); - TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher(); - dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge)); - testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true); - - // Now expect an unsettled 'declare' transfer to the txn coordinator, and - // reply with a declared disposition state containing the txnId. - txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); - declareMatcher = new TransferPayloadCompositeMatcher(); - declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); - testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); - - // Expect the messages that were not consumed to be released - for (int i = 1; i <= messageCount; i++) { - testPeer.expectDisposition(true, new ReleasedMatcher()); - } - - // Expect the consumer to be 'started' again as rollback completes - testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount))); - - session.rollback(); - - testPeer.waitForAllHandlersToComplete(1000); - } - } - - @Test(timeout=20000) - public void testRollbackTransactedSessionWithPrefetchFullyUtilisedByDrainWhenStoppingConsumer() throws Exception { - try (TestAmqpPeer testPeer = new TestAmqpPeer();) { - Connection connection = testFixture.establishConnecton(testPeer); - int messageCount = 5; - ((JmsConnection) connection).getPrefetchPolicy().setAll(messageCount); - connection.start(); - - testPeer.expectBegin(); - CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); - testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); - - // First expect an unsettled 'declare' transfer to the txn coordinator, and - // reply with a declared disposition state containing the txnId. - Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); - TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); - declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); - testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); - - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Queue queue = session.createQueue("myQueue"); - - // Create a consumer, expect it to flow credit, but don't send it any messages - testPeer.expectReceiverAttach(); - testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount))); - - session.createConsumer(queue); - - // Create a producer to use in provoking creation of the AMQP transaction - testPeer.expectSenderAttach(); - MessageProducer producer = session.createProducer(queue); - - // Expect the message which provoked creating the transaction - TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); - messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); - messageMatcher.setMessageAnnotationsMatcher( new MessageAnnotationsSectionMatcher(true)); - - TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher(); - stateMatcher.withTxnId(equalTo(txnId)); - stateMatcher.withOutcome(nullValue()); - - TransactionalState txState = new TransactionalState(); - txState.setTxnId(txnId); - txState.setOutcome(new Accepted()); - - testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true); - - producer.send(session.createMessage()); - - // Expect the consumer to be 'stopped' prior to rollback by issuing a 'drain' Flow. - // Action the drain by filling the prefetch (which is equivalent to this having happened while - // the Flow was in flight to the peer), and then DONT send a flow frame back to the client - // as it can tell from the messages that all the credit has been used. - testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), - messageCount, true, false, equalTo(UnsignedInteger.valueOf(messageCount)), 1, false); - - // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, - // and reply with accepted and settled disposition to indicate the rollback succeeded - Discharge discharge = new Discharge(); - discharge.setFail(true); - discharge.setTxnId(txnId); - TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher(); - dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge)); - testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true); - - // Then expect an unsettled 'declare' transfer to the txn coordinator, and - // reply with a declared disposition state containing the txnId. - txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); - declareMatcher = new TransferPayloadCompositeMatcher(); - declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); - testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); - - // Expect the messages that were not consumed to be released - for (int i = 1; i <= messageCount; i++) { - testPeer.expectDisposition(true, new ReleasedMatcher()); - } - - // Expect the consumer to be 'started' again as rollback completes - testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount))); - - session.rollback(); - - testPeer.waitForAllHandlersToComplete(1000); - } - } - - @Test(timeout=20000) - public void testDefaultOutcomeIsModifiedForConsumerSourceOnTransactedSession() throws Exception { - try (TestAmqpPeer testPeer = new TestAmqpPeer();) { - Connection connection = testFixture.establishConnecton(testPeer); - connection.start(); - - testPeer.expectBegin(); - CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); - testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); - - // First expect an unsettled 'declare' transfer to the txn coordinator, and - // reply with a declared disposition state containing the txnId. - Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); - TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); - declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); - testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); - - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - String queueName = "myQueue"; - Queue queue = session.createQueue(queueName); - - SourceMatcher sourceMatcher = new SourceMatcher(); - sourceMatcher.withAddress(equalTo(queueName)); - sourceMatcher.withDynamic(equalTo(false)); - sourceMatcher.withOutcomes(arrayContaining(Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL)); - ModifiedMatcher outcomeMatcher = new ModifiedMatcher().withDeliveryFailed(equalTo(true)).withUndeliverableHere(nullValue()); - sourceMatcher.withDefaultOutcome(outcomeMatcher); - - testPeer.expectReceiverAttach(notNullValue(), sourceMatcher); - testPeer.expectLinkFlow(); - - session.createConsumer(queue); - - testPeer.waitForAllHandlersToComplete(1000); - } - } @Test(timeout=20000) public void testPrefetchPolicyInfluencesCreditFlow() throws Exception { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0fd981ca/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java new file mode 100644 index 0000000..947700b --- /dev/null +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java @@ -0,0 +1,720 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.jms.integration; + +import static org.hamcrest.Matchers.arrayContaining; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.TransactionRolledBackException; + +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.test.QpidJmsTestCase; +import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; +import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted; +import org.apache.qpid.jms.test.testpeer.describedtypes.Declare; +import org.apache.qpid.jms.test.testpeer.describedtypes.Declared; +import org.apache.qpid.jms.test.testpeer.describedtypes.Discharge; +import org.apache.qpid.jms.test.testpeer.describedtypes.Error; +import org.apache.qpid.jms.test.testpeer.describedtypes.Modified; +import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected; +import org.apache.qpid.jms.test.testpeer.describedtypes.Released; +import org.apache.qpid.jms.test.testpeer.describedtypes.TransactionalState; +import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType; +import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher; +import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher; +import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher; +import org.apache.qpid.jms.test.testpeer.matchers.ReleasedMatcher; +import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher; +import org.apache.qpid.jms.test.testpeer.matchers.TransactionalStateMatcher; +import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher; +import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher; +import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher; +import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.junit.Test; + +/** + * Tests for behavior of Transacted Session operations. + */ +public class TransactionsIntegrationTest extends QpidJmsTestCase { + + private final IntegrationTestFixture testFixture = new IntegrationTestFixture(); + + @Test(timeout=20000) + public void testTransactionCommitFailWithEmptyRejectedDisposition() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + testPeer.expectBegin(); + CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); + testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); + + // First expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a Declared disposition state containing the txnId. + Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue("myQueue"); + + // Create a producer to use in provoking creation of the AMQP transaction + testPeer.expectSenderAttach(); + MessageProducer producer = session.createProducer(queue); + + // Expect the message which was sent under the current transaction. Check it carries + // TransactionalState with the above txnId but has no outcome. Respond with a + // TransactionalState with Accepted outcome. + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); + messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true)); + + TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher(); + stateMatcher.withTxnId(equalTo(txnId)); + stateMatcher.withOutcome(nullValue()); + + TransactionalState txState = new TransactionalState(); + txState.setTxnId(txnId); + txState.setOutcome(new Accepted()); + + testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true); + + producer.send(session.createMessage()); + + // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, + // and reply with rejected and settled disposition to indicate the commit failed + Discharge discharge = new Discharge(); + discharge.setFail(false); + discharge.setTxnId(txnId); + TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher(); + dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge)); + testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Rejected(), true); + + // Then expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); + declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + + try { + session.commit(); + fail("Commit operation should have failed."); + } catch (TransactionRolledBackException jmsTxRb) { + } + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout=20000) + public void testProducedMessagesAfterCommitOfSentMessagesFails() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + testPeer.expectBegin(); + CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); + testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); + + // First expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a Declared disposition state containing the txnId. + Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue("myQueue"); + + // Create a producer to use in provoking creation of the AMQP transaction + testPeer.expectSenderAttach(); + MessageProducer producer = session.createProducer(queue); + + // Expect the message which was sent under the current transaction. Check it carries + // TransactionalState with the above txnId but has no outcome. Respond with a + // TransactionalState with Accepted outcome. + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); + messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true)); + + TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher(); + stateMatcher.withTxnId(equalTo(txnId)); + stateMatcher.withOutcome(nullValue()); + + TransactionalState txState = new TransactionalState(); + txState.setTxnId(txnId); + txState.setOutcome(new Accepted()); + + testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true); + + producer.send(session.createMessage()); + + // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, + // and reply with rejected and settled disposition to indicate the commit failed + Discharge discharge = new Discharge(); + discharge.setFail(false); + discharge.setTxnId(txnId); + TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher(); + dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge)); + Rejected commitFailure = new Rejected(new Error(Symbol.valueOf("failed"), "Unknown error")); + testPeer.expectTransfer(dischargeMatcher, nullValue(), false, commitFailure, true); + + // Then expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); + declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + + try { + session.commit(); + fail("Commit operation should have failed."); + } catch (TransactionRolledBackException jmsTxRb) { + } + + // Expect the message which was sent under the current transaction. Check it carries + // TransactionalState with the above txnId but has no outcome. Respond with a + // TransactionalState with Accepted outcome. + stateMatcher = new TransactionalStateMatcher(); + stateMatcher.withTxnId(equalTo(txnId)); + stateMatcher.withOutcome(nullValue()); + + txState = new TransactionalState(); + txState.setTxnId(txnId); + txState.setOutcome(new Accepted()); + + testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true); + + producer.send(session.createMessage()); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout=20000) + public void testProducedMessagesAfterRollbackSentMessagesFails() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + testPeer.expectBegin(); + CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); + testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); + + // First expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a Declared disposition state containing the txnId. + Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue("myQueue"); + + // Create a producer to use in provoking creation of the AMQP transaction + testPeer.expectSenderAttach(); + MessageProducer producer = session.createProducer(queue); + + // Expect the message which was sent under the current transaction. Check it carries + // TransactionalState with the above txnId but has no outcome. Respond with a + // TransactionalState with Accepted outcome. + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); + messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true)); + + TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher(); + stateMatcher.withTxnId(equalTo(txnId)); + stateMatcher.withOutcome(nullValue()); + + TransactionalState txState = new TransactionalState(); + txState.setTxnId(txnId); + txState.setOutcome(new Accepted()); + + testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true); + + producer.send(session.createMessage()); + + // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, + // and reply with rejected and settled disposition to indicate the rollback failed + Discharge discharge = new Discharge(); + discharge.setFail(true); + discharge.setTxnId(txnId); + TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher(); + dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge)); + Rejected commitFailure = new Rejected(new Error(Symbol.valueOf("failed"), "Unknown error")); + testPeer.expectTransfer(dischargeMatcher, nullValue(), false, commitFailure, true); + + // Then expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); + declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + + try { + session.rollback(); + fail("Rollback operation should have failed."); + } catch (JMSException jmsex) { + } + + // Expect the message which was sent under the current transaction. Check it carries + // TransactionalState with the above txnId but has no outcome. Respond with a + // TransactionalState with Accepted outcome. + stateMatcher = new TransactionalStateMatcher(); + stateMatcher.withTxnId(equalTo(txnId)); + stateMatcher.withOutcome(nullValue()); + + txState = new TransactionalState(); + txState.setTxnId(txnId); + txState.setOutcome(new Accepted()); + + testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true); + + producer.send(session.createMessage()); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout=20000) + public void testCommitTransactedSessionWithConsumerReceivingAllMessages() throws Exception { + doCommitTransactedSessionWithConsumerTestImpl(1, 1); + } + + @Test(timeout=20000) + public void testCommitTransactedSessionWithConsumerReceivingSomeMessages() throws Exception { + doCommitTransactedSessionWithConsumerTestImpl(5, 2); + } + + private void doCommitTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount) throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + testPeer.expectBegin(); + CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); + testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); + + // First expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); + TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue("myQueue"); + + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), transferCount); + + for (int i = 1; i <= consumeCount; i++) { + // Then expect an *settled* TransactionalState disposition for each message once received by the consumer + TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher(); + stateMatcher.withTxnId(equalTo(txnId)); + stateMatcher.withOutcome(new AcceptedMatcher()); + + testPeer.expectDisposition(true, stateMatcher); + } + + MessageConsumer messageConsumer = session.createConsumer(queue); + + for (int i = 1; i <= consumeCount; i++) { + Message receivedMessage = messageConsumer.receive(3000); + + assertNotNull(receivedMessage); + assertTrue(receivedMessage instanceof TextMessage); + } + + // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, + // and reply with accepted and settled disposition to indicate the commit succeeded + Discharge discharge = new Discharge(); + discharge.setFail(false); + discharge.setTxnId(txnId); + TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher(); + dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge)); + testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true); + + // Then expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); + declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + + session.commit(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout=20000) + public void testProducedMessagesOnTransactedSessionCarryTxnId() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + testPeer.expectBegin(); + CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); + testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); + + // First expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a Declared disposition state containing the txnId. + Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue("myQueue"); + + // Create a producer to use in provoking creation of the AMQP transaction + testPeer.expectSenderAttach(); + MessageProducer producer = session.createProducer(queue); + + // Expect the message which was sent under the current transaction. Check it carries + // TransactionalState with the above txnId but has no outcome. Respond with a + // TransactionalState with Accepted outcome. + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); + messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true)); + + TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher(); + stateMatcher.withTxnId(equalTo(txnId)); + stateMatcher.withOutcome(nullValue()); + + TransactionalState txState = new TransactionalState(); + txState.setTxnId(txnId); + txState.setOutcome(new Accepted()); + + testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true); + + producer.send(session.createMessage()); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout=20000) + public void testRollbackTransactedSessionWithConsumerReceivingAllMessages() throws Exception { + doRollbackTransactedSessionWithConsumerTestImpl(1, 1); + } + + @Test(timeout=20000) + public void testRollbackTransactedSessionWithConsumerReceivingSomeMessages() throws Exception { + doRollbackTransactedSessionWithConsumerTestImpl(5, 2); + } + + private void doRollbackTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount) throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + testPeer.expectBegin(); + CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); + testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); + + // First expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue("myQueue"); + + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), transferCount); + + for (int i = 1; i <= consumeCount; i++) { + // Then expect a *settled* TransactionalState disposition for each message once received by the consumer + TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher(); + stateMatcher.withTxnId(equalTo(txnId)); + stateMatcher.withOutcome(new AcceptedMatcher()); + + testPeer.expectDisposition(true, stateMatcher); + } + + MessageConsumer messageConsumer = session.createConsumer(queue); + + for (int i = 1; i <= consumeCount; i++) { + Message receivedMessage = messageConsumer.receive(3000); + + assertNotNull(receivedMessage); + assertTrue(receivedMessage instanceof TextMessage); + } + + // Expect the consumer to be 'stopped' prior to rollback by issuing a 'drain' + testPeer.expectLinkFlow(true, true, greaterThan(UnsignedInteger.ZERO)); + + // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, + // and reply with accepted and settled disposition to indicate the rollback succeeded + Discharge discharge = new Discharge(); + discharge.setFail(true); + discharge.setTxnId(txnId); + TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher(); + dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge)); + testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true); + + // Then expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + + // Expect the messages that were not consumed to be released + int unconsumed = transferCount - consumeCount; + for (int i = 1; i <= unconsumed; i++) { + testPeer.expectDisposition(true, new ReleasedMatcher()); + } + + // Expect the consumer to be 'started' again as rollback completes + testPeer.expectLinkFlow(false, false, greaterThan(UnsignedInteger.ZERO)); + + session.rollback(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout=20000) + public void testRollbackTransactedSessionWithPrefetchFullBeforeStoppingConsumer() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + int messageCount = 5; + ((JmsConnection) connection).getPrefetchPolicy().setAll(messageCount); + connection.start(); + + testPeer.expectBegin(); + CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); + testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); + + // First expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue("myQueue"); + + // Create a consumer and fill the prefetch with messages, which we wont consume any of + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), messageCount); + + session.createConsumer(queue); + + // Create a producer to use in provoking creation of the AMQP transaction + testPeer.expectSenderAttach(); + MessageProducer producer = session.createProducer(queue); + + // Expect the message which provoked creating the transaction + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); + messageMatcher.setMessageAnnotationsMatcher( new MessageAnnotationsSectionMatcher(true)); + TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher(); + stateMatcher.withTxnId(equalTo(txnId)); + stateMatcher.withOutcome(nullValue()); + + TransactionalState txState = new TransactionalState(); + txState.setTxnId(txnId); + txState.setOutcome(new Accepted()); + + testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true); + + producer.send(session.createMessage()); + + // The consumer will be 'stopped' prior to rollback, however we will NOT send a 'drain' Flow + // frame as we have manipulated that all the credit was already used, i.e. it already stopped. + + // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, + // and reply with accepted and settled disposition to indicate the rollback succeeded + Discharge discharge = new Discharge(); + discharge.setFail(true); + discharge.setTxnId(txnId); + TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher(); + dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge)); + testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true); + + // Now expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + + // Expect the messages that were not consumed to be released + for (int i = 1; i <= messageCount; i++) { + testPeer.expectDisposition(true, new ReleasedMatcher()); + } + + // Expect the consumer to be 'started' again as rollback completes + testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount))); + + session.rollback(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout=20000) + public void testRollbackTransactedSessionWithPrefetchFullyUtilisedByDrainWhenStoppingConsumer() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + int messageCount = 5; + ((JmsConnection) connection).getPrefetchPolicy().setAll(messageCount); + connection.start(); + + testPeer.expectBegin(); + CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); + testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); + + // First expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue("myQueue"); + + // Create a consumer, expect it to flow credit, but don't send it any messages + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount))); + + session.createConsumer(queue); + + // Create a producer to use in provoking creation of the AMQP transaction + testPeer.expectSenderAttach(); + MessageProducer producer = session.createProducer(queue); + + // Expect the message which provoked creating the transaction + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); + messageMatcher.setMessageAnnotationsMatcher( new MessageAnnotationsSectionMatcher(true)); + + TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher(); + stateMatcher.withTxnId(equalTo(txnId)); + stateMatcher.withOutcome(nullValue()); + + TransactionalState txState = new TransactionalState(); + txState.setTxnId(txnId); + txState.setOutcome(new Accepted()); + + testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true); + + producer.send(session.createMessage()); + + // Expect the consumer to be 'stopped' prior to rollback by issuing a 'drain' Flow. + // Action the drain by filling the prefetch (which is equivalent to this having happened while + // the Flow was in flight to the peer), and then DONT send a flow frame back to the client + // as it can tell from the messages that all the credit has been used. + testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), + messageCount, true, false, equalTo(UnsignedInteger.valueOf(messageCount)), 1, false); + + // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, + // and reply with accepted and settled disposition to indicate the rollback succeeded + Discharge discharge = new Discharge(); + discharge.setFail(true); + discharge.setTxnId(txnId); + TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher(); + dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge)); + testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true); + + // Then expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + + // Expect the messages that were not consumed to be released + for (int i = 1; i <= messageCount; i++) { + testPeer.expectDisposition(true, new ReleasedMatcher()); + } + + // Expect the consumer to be 'started' again as rollback completes + testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount))); + + session.rollback(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout=20000) + public void testDefaultOutcomeIsModifiedForConsumerSourceOnTransactedSession() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + testPeer.expectBegin(); + CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); + testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); + + // First expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + String queueName = "myQueue"; + Queue queue = session.createQueue(queueName); + + SourceMatcher sourceMatcher = new SourceMatcher(); + sourceMatcher.withAddress(equalTo(queueName)); + sourceMatcher.withDynamic(equalTo(false)); + sourceMatcher.withOutcomes(arrayContaining(Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL)); + ModifiedMatcher outcomeMatcher = new ModifiedMatcher().withDeliveryFailed(equalTo(true)).withUndeliverableHere(nullValue()); + sourceMatcher.withDefaultOutcome(outcomeMatcher); + + testPeer.expectReceiverAttach(notNullValue(), sourceMatcher); + testPeer.expectLinkFlow(); + + session.createConsumer(queue); + + testPeer.waitForAllHandlersToComplete(1000); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
