add brokerless integration test of committing a transacted session with a consumer
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/92112063 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/92112063 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/92112063 Branch: refs/heads/master Commit: 92112063a48a62032da0811aadf5deb86749572d Parents: e13dfc4 Author: Robert Gemmell <[email protected]> Authored: Mon Nov 17 14:17:06 2014 +0000 Committer: Robert Gemmell <[email protected]> Committed: Mon Nov 17 14:17:06 2014 +0000 ---------------------------------------------------------------------- .../jms/integration/SessionIntegrationTest.java | 63 ++++++++++++++++++++ .../qpid/jms/test/testpeer/TestAmqpPeer.java | 1 - 2 files changed, 63 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/92112063/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java index 5a4a754..498af7f 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; @@ -31,19 +32,31 @@ import java.io.IOException; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TemporaryQueue; +import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicSubscriber; import org.apache.qpid.jms.test.QpidJmsTestCase; +import org.apache.qpid.jms.test.testpeer.DescriptorMatcher; import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; +import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted; +import org.apache.qpid.jms.test.testpeer.describedtypes.Declare; +import org.apache.qpid.jms.test.testpeer.describedtypes.Declared; +import org.apache.qpid.jms.test.testpeer.describedtypes.Discharge; +import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType; +import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher; import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher; +import org.apache.qpid.jms.test.testpeer.matchers.TransactionalStateMatcher; import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher; import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher; import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher; +import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher; +import org.apache.qpid.proton.amqp.Binary; import org.junit.Test; public class SessionIntegrationTest extends QpidJmsTestCase { @@ -311,4 +324,54 @@ public class SessionIntegrationTest extends QpidJmsTestCase { testPeer.waitForAllHandlersToComplete(1000); } } + + @Test(timeout=5000) + public void testCommitTransactedSessionWithConsumer() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { + Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + testPeer.expectBegin(true); + CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); + testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue("myQueue"); + + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content")); + + // First expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); + TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, false, new Declared().setTxnId(txnId), true); + + // Then expect a TransactionalState disposition for the applications message once consumed + TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher(); + stateMatcher.withTxnId(equalTo(txnId)); + stateMatcher.withOutcome(new DescriptorMatcher(Accepted.DESCRIPTOR_CODE, Accepted.DESCRIPTOR_SYMBOL)); + testPeer.expectDisposition(false, stateMatcher); + + MessageConsumer messageConsumer = session.createConsumer(queue); + Message receivedMessage = messageConsumer.receive(1000); + + assertNotNull(receivedMessage); + assertTrue(receivedMessage instanceof TextMessage); + + // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, + // and reply with accepted and settled disposition to indicate the commit succeeded + Discharge discharge = new Discharge(); + discharge.setFail(false); + discharge.setTxnId(txnId); + TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher(); + dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge)); + testPeer.expectTransfer(dischargeMatcher, false, new Accepted(), true); + + session.commit(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/92112063/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java index c0f2bc2..adc4fe9 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java @@ -71,7 +71,6 @@ import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.UnsignedByte; import org.apache.qpid.proton.amqp.UnsignedInteger; import org.apache.qpid.proton.amqp.UnsignedShort; -import org.apache.qpid.proton.amqp.transport.DeliveryState; import org.apache.qpid.proton.codec.Data; import org.apache.qpid.proton.engine.impl.AmqpHeader; import org.hamcrest.Matcher; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
