add a test where the drain attempt uses all the available credit and no flow response is sent as a result
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/58616e84 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/58616e84 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/58616e84 Branch: refs/heads/master Commit: 58616e84bc6faaeaa89a628aa3486f51285d16fc Parents: b5e981c Author: Robert Gemmell <[email protected]> Authored: Tue Dec 9 15:50:19 2014 +0000 Committer: Robert Gemmell <[email protected]> Committed: Tue Dec 9 15:50:19 2014 +0000 ---------------------------------------------------------------------- .../jms/integration/SessionIntegrationTest.java | 70 ++++++++++++++++++++ .../qpid/jms/test/testpeer/TestAmqpPeer.java | 2 +- 2 files changed, 71 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/58616e84/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java index b527589..21e8285 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java @@ -614,6 +614,76 @@ public class SessionIntegrationTest extends QpidJmsTestCase { } @Test(timeout=5000) + public void testRollbackTransactedSessionWithPrefetchFullyUtilisedByDrainWhenStoppingConsumer() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { + Connection connection = testFixture.establishConnecton(testPeer); + int messageCount = 5; + ((JmsConnection) connection).getPrefetchPolicy().setAll(messageCount); + connection.start(); + + testPeer.expectBegin(true); + CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); + testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue("myQueue"); + + // Create a consumer, expect it to flow credit, but don't send it any messages + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount))); + + session.createConsumer(queue); + + // Create a producer to use in provoking creation of the AMQP transaction + testPeer.expectSenderAttach(); + MessageProducer producer = session.createProducer(queue); + + // First expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, false, new Declared().setTxnId(txnId), true); + + // Expect the message which provoked creating the transaction + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); + messageMatcher.setMessageAnnotationsMatcher( new MessageAnnotationsSectionMatcher(true)); + testPeer.expectTransfer(messageMatcher); //TODO: check it is marked as being in the transaction + + producer.send(session.createMessage()); + + // Expect the consumer to be 'stopped' prior to rollback by issuing a 'drain' Flow. + // Action the drain by filling the prefetch (which is equivalent to this having happened while + // the Flow was in flight to the peer), and then DONT send a flow frame back to the client + // as it can tell from the messages that all the credit has been used. + testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), + messageCount, true, false, equalTo(UnsignedInteger.valueOf(messageCount)), 1); + + // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, + // and reply with accepted and settled disposition to indicate the rollback succeeded + Discharge discharge = new Discharge(); + discharge.setFail(true); + discharge.setTxnId(txnId); + TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher(); + dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge)); + testPeer.expectTransfer(dischargeMatcher, false, new Accepted(), true); + + // Expect the messages that were not consumed to be released + for (int i = 1; i <= messageCount; i++) { + testPeer.expectDisposition(true, new ReleasedMatcher()); + } + + // Expect the consumer to be 'started' again as rollback completes + testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount))); + + session.rollback(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout=5000) public void testDefaultOutcomeIsModifiedForConsumerSourceOnTransactedSession() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { Connection connection = testFixture.establishConnecton(testPeer); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/58616e84/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java index cca5ab5..efac92c 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java @@ -784,7 +784,6 @@ public class TestAmqpPeer implements AutoCloseable Binary dtag = new Binary(tagString.getBytes()); final TransferFrame transferResponse = new TransferFrame() - .setHandle(UnsignedInteger.valueOf(_nextLinkHandle - 1)) // TODO: this needs to be the value used in the attach response .setDeliveryId(UnsignedInteger.valueOf(nextId)) .setDeliveryTag(dtag) .setMessageFormat(UnsignedInteger.ZERO) @@ -800,6 +799,7 @@ public class TestAmqpPeer implements AutoCloseable @Override public void setValues() { + transferResponse.setHandle(calculateLinkHandle(flowMatcher)); transferResponseSender.setChannel(flowMatcher.getActualChannel()); } }); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
