Repository: qpid-jms
Updated Branches:
  refs/heads/master f5b8f9fc2 -> 0fd981ca1


QPIDJMS-125 Fix an potential NPE when a TX Discharge fails with rejected
that has a null error.  Fix issue with auto-starting new TX after commit
or rollback fails.  Adds some tests to cover the above plus moves TX
tests into their own JUnit test.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/0fd981ca
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/0fd981ca
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/0fd981ca

Branch: refs/heads/master
Commit: 0fd981ca1397607362c0c89920017fded813cec1
Parents: f5b8f9f
Author: Timothy Bish <[email protected]>
Authored: Tue Oct 13 17:06:59 2015 -0400
Committer: Timothy Bish <[email protected]>
Committed: Tue Oct 13 17:06:59 2015 -0400

----------------------------------------------------------------------
 .../qpid/jms/JmsLocalTransactionContext.java    |   2 +
 .../provider/amqp/AmqpTransactionContext.java   |   4 +-
 .../jms/integration/SessionIntegrationTest.java | 412 -----------
 .../TransactionsIntegrationTest.java            | 720 +++++++++++++++++++
 4 files changed, 724 insertions(+), 414 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0fd981ca/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
index daee929..7d60fa7 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
@@ -183,6 +183,7 @@ public class JmsLocalTransactionContext implements 
JmsTransactionContext {
 
                         @Override
                         public void onPendingFailure(Throwable cause) {
+                            reset();
                         }
                     });
 
@@ -235,6 +236,7 @@ public class JmsLocalTransactionContext implements 
JmsTransactionContext {
 
                     @Override
                     public void onPendingFailure(Throwable cause) {
+                        reset();
                     }
                 });
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0fd981ca/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
index a5e9284..648b896 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
@@ -98,8 +98,8 @@ public class AmqpTransactionContext extends 
AmqpAbstractResource<JmsSessionInfo,
                     LOG.debug("Last TX request failed: {}", 
current.getProviderHint());
                     pendingDelivery.settle();
                     Rejected rejected = (Rejected) state;
-                    TransactionRolledBackException ex =
-                        new 
TransactionRolledBackException(rejected.getError().getDescription());
+                    Exception cause = 
AmqpSupport.convertToException(rejected.getError());
+                    TransactionRolledBackException ex = new 
TransactionRolledBackException(cause.getMessage());
                     AsyncResult request = this.pendingRequest;
                     this.current = null;
                     this.pendingRequest = null;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0fd981ca/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 4daec4b..9fc563f 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -21,7 +21,6 @@ package org.apache.qpid.jms.integration;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.ANONYMOUS_RELAY;
 import static org.hamcrest.Matchers.arrayContaining;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertEquals;
@@ -66,17 +65,12 @@ import 
org.apache.qpid.jms.test.testpeer.basictypes.TerminusDurability;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Declare;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Declared;
-import org.apache.qpid.jms.test.testpeer.describedtypes.Discharge;
-import org.apache.qpid.jms.test.testpeer.describedtypes.Modified;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected;
-import org.apache.qpid.jms.test.testpeer.describedtypes.Released;
-import org.apache.qpid.jms.test.testpeer.describedtypes.TransactionalState;
 import 
org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
 import 
org.apache.qpid.jms.test.testpeer.describedtypes.sections.HeaderDescribedType;
 import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
-import org.apache.qpid.jms.test.testpeer.matchers.ReleasedMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.TransactionalStateMatcher;
@@ -952,78 +946,6 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
     }
 
     @Test(timeout=20000)
-    public void testCommitTransactedSessionWithConsumerReceivingAllMessages() 
throws Exception {
-        doCommitTransactedSessionWithConsumerTestImpl(1, 1);
-    }
-
-    @Test(timeout=20000)
-    public void testCommitTransactedSessionWithConsumerReceivingSomeMessages() 
throws Exception {
-        doCommitTransactedSessionWithConsumerTestImpl(5, 2);
-    }
-
-    private void doCommitTransactedSessionWithConsumerTestImpl(int 
transferCount, int consumeCount) throws Exception {
-        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            Connection connection = testFixture.establishConnecton(testPeer);
-            connection.start();
-
-            testPeer.expectBegin();
-            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
-            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
-
-            // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
-            // reply with a declared disposition state containing the txnId.
-            Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 
3, (byte) 4});
-            TransferPayloadCompositeMatcher declareMatcher = new 
TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
-
-            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
-            Queue queue = session.createQueue("myQueue");
-
-            testPeer.expectReceiverAttach();
-            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, 
new AmqpValueDescribedType("content"), transferCount);
-
-            for (int i = 1; i <= consumeCount; i++) {
-                // Then expect an *settled* TransactionalState disposition for 
each message once received by the consumer
-                TransactionalStateMatcher stateMatcher = new 
TransactionalStateMatcher();
-                stateMatcher.withTxnId(equalTo(txnId));
-                stateMatcher.withOutcome(new AcceptedMatcher());
-
-                testPeer.expectDisposition(true, stateMatcher);
-            }
-
-            MessageConsumer messageConsumer = session.createConsumer(queue);
-
-            for (int i = 1; i <= consumeCount; i++) {
-                Message receivedMessage = messageConsumer.receive(3000);
-
-                assertNotNull(receivedMessage);
-                assertTrue(receivedMessage instanceof TextMessage);
-            }
-
-            // Expect an unsettled 'discharge' transfer to the txn coordinator 
containing the txnId,
-            // and reply with accepted and settled disposition to indicate the 
commit succeeded
-            Discharge discharge = new Discharge();
-            discharge.setFail(false);
-            discharge.setTxnId(txnId);
-            TransferPayloadCompositeMatcher dischargeMatcher = new 
TransferPayloadCompositeMatcher();
-            dischargeMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(discharge));
-            testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new 
Accepted(), true);
-
-            // Then expect an unsettled 'declare' transfer to the txn 
coordinator, and
-            // reply with a declared disposition state containing the txnId.
-            txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, 
(byte) 4});
-            declareMatcher = new TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
-
-            session.commit();
-
-            testPeer.waitForAllHandlersToComplete(1000);
-        }
-    }
-
-    @Test(timeout=20000)
     public void testIncomingMessageExceedsMaxRedeliveries() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
             final int COUNT = 5;
@@ -1070,340 +992,6 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
         }
     }
 
-    @Test(timeout=20000)
-    public void testProducedMessagesOnTransactedSessionCarryTxnId() throws 
Exception {
-        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            Connection connection = testFixture.establishConnecton(testPeer);
-            connection.start();
-
-            testPeer.expectBegin();
-            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
-            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
-
-            // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
-            // reply with a Declared disposition state containing the txnId.
-            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 
7, (byte) 8});
-            TransferPayloadCompositeMatcher declareMatcher = new 
TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
-
-            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
-            Queue queue = session.createQueue("myQueue");
-
-            // Create a producer to use in provoking creation of the AMQP 
transaction
-            testPeer.expectSenderAttach();
-            MessageProducer producer  = session.createProducer(queue);
-
-            // Expect the message which provoked creating the transaction. 
Check it carries
-            // TransactionalState with the above txnId but has no outcome. 
Respond with a
-            // TransactionalState with Accepted outcome.
-            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
-            messageMatcher.setHeadersMatcher(new 
MessageHeaderSectionMatcher(true));
-            messageMatcher.setMessageAnnotationsMatcher( new 
MessageAnnotationsSectionMatcher(true));
-
-            TransactionalStateMatcher stateMatcher = new 
TransactionalStateMatcher();
-            stateMatcher.withTxnId(equalTo(txnId));
-            stateMatcher.withOutcome(nullValue());
-
-            TransactionalState txState = new TransactionalState();
-            txState.setTxnId(txnId);
-            txState.setOutcome(new Accepted());
-
-            testPeer.expectTransfer(messageMatcher, stateMatcher, false, 
txState, true);
-
-            producer.send(session.createMessage());
-
-            testPeer.waitForAllHandlersToComplete(1000);
-        }
-    }
-
-    @Test(timeout=20000)
-    public void 
testRollbackTransactedSessionWithConsumerReceivingAllMessages() throws 
Exception {
-        doRollbackTransactedSessionWithConsumerTestImpl(1, 1);
-    }
-
-    @Test(timeout=20000)
-    public void 
testRollbackTransactedSessionWithConsumerReceivingSomeMessages() throws 
Exception {
-        doRollbackTransactedSessionWithConsumerTestImpl(5, 2);
-    }
-
-    private void doRollbackTransactedSessionWithConsumerTestImpl(int 
transferCount, int consumeCount) throws Exception {
-        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            Connection connection = testFixture.establishConnecton(testPeer);
-            connection.start();
-
-            testPeer.expectBegin();
-            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
-            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
-
-            // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
-            // reply with a declared disposition state containing the txnId.
-            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 
7, (byte) 8});
-            TransferPayloadCompositeMatcher declareMatcher = new 
TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
-
-            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
-            Queue queue = session.createQueue("myQueue");
-
-            testPeer.expectReceiverAttach();
-            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, 
new AmqpValueDescribedType("content"), transferCount);
-
-            for (int i = 1; i <= consumeCount; i++) {
-                // Then expect a *settled* TransactionalState disposition for 
each message once received by the consumer
-                TransactionalStateMatcher stateMatcher = new 
TransactionalStateMatcher();
-                stateMatcher.withTxnId(equalTo(txnId));
-                stateMatcher.withOutcome(new AcceptedMatcher());
-
-                testPeer.expectDisposition(true, stateMatcher);
-            }
-
-            MessageConsumer messageConsumer = session.createConsumer(queue);
-
-            for (int i = 1; i <= consumeCount; i++) {
-                Message receivedMessage = messageConsumer.receive(3000);
-
-                assertNotNull(receivedMessage);
-                assertTrue(receivedMessage instanceof TextMessage);
-            }
-
-            // Expect the consumer to be 'stopped' prior to rollback by 
issuing a 'drain'
-            testPeer.expectLinkFlow(true, true, 
greaterThan(UnsignedInteger.ZERO));
-
-            // Expect an unsettled 'discharge' transfer to the txn coordinator 
containing the txnId,
-            // and reply with accepted and settled disposition to indicate the 
rollback succeeded
-            Discharge discharge = new Discharge();
-            discharge.setFail(true);
-            discharge.setTxnId(txnId);
-            TransferPayloadCompositeMatcher dischargeMatcher = new 
TransferPayloadCompositeMatcher();
-            dischargeMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(discharge));
-            testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new 
Accepted(), true);
-
-            // Then expect an unsettled 'declare' transfer to the txn 
coordinator, and
-            // reply with a declared disposition state containing the txnId.
-            txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, 
(byte) 8});
-            declareMatcher = new TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
-
-            // Expect the messages that were not consumed to be released
-            int unconsumed = transferCount - consumeCount;
-            for (int i = 1; i <= unconsumed; i++) {
-                testPeer.expectDisposition(true, new ReleasedMatcher());
-            }
-
-            // Expect the consumer to be 'started' again as rollback completes
-            testPeer.expectLinkFlow(false, false, 
greaterThan(UnsignedInteger.ZERO));
-
-            session.rollback();
-
-            testPeer.waitForAllHandlersToComplete(1000);
-        }
-    }
-
-    @Test(timeout=20000)
-    public void 
testRollbackTransactedSessionWithPrefetchFullBeforeStoppingConsumer() throws 
Exception {
-        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            Connection connection = testFixture.establishConnecton(testPeer);
-            int messageCount = 5;
-            ((JmsConnection) 
connection).getPrefetchPolicy().setAll(messageCount);
-            connection.start();
-
-            testPeer.expectBegin();
-            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
-            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
-
-            // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
-            // reply with a declared disposition state containing the txnId.
-            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 
7, (byte) 8});
-            TransferPayloadCompositeMatcher declareMatcher = new 
TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
-
-            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
-            Queue queue = session.createQueue("myQueue");
-
-            // Create a consumer and fill the prefetch with messages, which we 
wont consume any of
-            testPeer.expectReceiverAttach();
-            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, 
new AmqpValueDescribedType("content"), messageCount);
-
-            session.createConsumer(queue);
-
-            // Create a producer to use in provoking creation of the AMQP 
transaction
-            testPeer.expectSenderAttach();
-            MessageProducer producer  = session.createProducer(queue);
-
-            // Expect the message which provoked creating the transaction
-            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
-            messageMatcher.setHeadersMatcher(new 
MessageHeaderSectionMatcher(true));
-            messageMatcher.setMessageAnnotationsMatcher( new 
MessageAnnotationsSectionMatcher(true));
-            TransactionalStateMatcher stateMatcher = new 
TransactionalStateMatcher();
-            stateMatcher.withTxnId(equalTo(txnId));
-            stateMatcher.withOutcome(nullValue());
-
-            TransactionalState txState = new TransactionalState();
-            txState.setTxnId(txnId);
-            txState.setOutcome(new Accepted());
-
-            testPeer.expectTransfer(messageMatcher, stateMatcher, false, 
txState, true);
-
-            producer.send(session.createMessage());
-
-            // The consumer will be 'stopped' prior to rollback, however we 
will NOT send a 'drain' Flow
-            // frame as we have manipulated that all the credit was already 
used, i.e. it already stopped.
-
-            // Expect an unsettled 'discharge' transfer to the txn coordinator 
containing the txnId,
-            // and reply with accepted and settled disposition to indicate the 
rollback succeeded
-            Discharge discharge = new Discharge();
-            discharge.setFail(true);
-            discharge.setTxnId(txnId);
-            TransferPayloadCompositeMatcher dischargeMatcher = new 
TransferPayloadCompositeMatcher();
-            dischargeMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(discharge));
-            testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new 
Accepted(), true);
-
-            // Now expect an unsettled 'declare' transfer to the txn 
coordinator, and
-            // reply with a declared disposition state containing the txnId.
-            txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, 
(byte) 8});
-            declareMatcher = new TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
-
-            // Expect the messages that were not consumed to be released
-            for (int i = 1; i <= messageCount; i++) {
-                testPeer.expectDisposition(true, new ReleasedMatcher());
-            }
-
-            // Expect the consumer to be 'started' again as rollback completes
-            testPeer.expectLinkFlow(false, false, 
equalTo(UnsignedInteger.valueOf(messageCount)));
-
-            session.rollback();
-
-            testPeer.waitForAllHandlersToComplete(1000);
-        }
-    }
-
-    @Test(timeout=20000)
-    public void 
testRollbackTransactedSessionWithPrefetchFullyUtilisedByDrainWhenStoppingConsumer()
 throws Exception {
-        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            Connection connection = testFixture.establishConnecton(testPeer);
-            int messageCount = 5;
-            ((JmsConnection) 
connection).getPrefetchPolicy().setAll(messageCount);
-            connection.start();
-
-            testPeer.expectBegin();
-            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
-            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
-
-            // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
-            // reply with a declared disposition state containing the txnId.
-            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 
7, (byte) 8});
-            TransferPayloadCompositeMatcher declareMatcher = new 
TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
-
-            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
-            Queue queue = session.createQueue("myQueue");
-
-            // Create a consumer, expect it to flow credit, but don't send it 
any messages
-            testPeer.expectReceiverAttach();
-            testPeer.expectLinkFlow(false, false, 
equalTo(UnsignedInteger.valueOf(messageCount)));
-
-            session.createConsumer(queue);
-
-            // Create a producer to use in provoking creation of the AMQP 
transaction
-            testPeer.expectSenderAttach();
-            MessageProducer producer  = session.createProducer(queue);
-
-            // Expect the message which provoked creating the transaction
-            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
-            messageMatcher.setHeadersMatcher(new 
MessageHeaderSectionMatcher(true));
-            messageMatcher.setMessageAnnotationsMatcher( new 
MessageAnnotationsSectionMatcher(true));
-
-            TransactionalStateMatcher stateMatcher = new 
TransactionalStateMatcher();
-            stateMatcher.withTxnId(equalTo(txnId));
-            stateMatcher.withOutcome(nullValue());
-
-            TransactionalState txState = new TransactionalState();
-            txState.setTxnId(txnId);
-            txState.setOutcome(new Accepted());
-
-            testPeer.expectTransfer(messageMatcher, stateMatcher, false, 
txState, true);
-
-            producer.send(session.createMessage());
-
-            // Expect the consumer to be 'stopped' prior to rollback by 
issuing a 'drain' Flow.
-            // Action the drain by filling the prefetch (which is equivalent 
to this having happened while
-            // the Flow was in flight to the peer), and then DONT send a flow 
frame back to the client
-            // as it can tell from the messages that all the credit has been 
used.
-            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, 
new AmqpValueDescribedType("content"),
-                                                       messageCount, true, 
false, equalTo(UnsignedInteger.valueOf(messageCount)), 1, false);
-
-            // Expect an unsettled 'discharge' transfer to the txn coordinator 
containing the txnId,
-            // and reply with accepted and settled disposition to indicate the 
rollback succeeded
-            Discharge discharge = new Discharge();
-            discharge.setFail(true);
-            discharge.setTxnId(txnId);
-            TransferPayloadCompositeMatcher dischargeMatcher = new 
TransferPayloadCompositeMatcher();
-            dischargeMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(discharge));
-            testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new 
Accepted(), true);
-
-            // Then expect an unsettled 'declare' transfer to the txn 
coordinator, and
-            // reply with a declared disposition state containing the txnId.
-            txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, 
(byte) 8});
-            declareMatcher = new TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
-
-            // Expect the messages that were not consumed to be released
-            for (int i = 1; i <= messageCount; i++) {
-                testPeer.expectDisposition(true, new ReleasedMatcher());
-            }
-
-            // Expect the consumer to be 'started' again as rollback completes
-            testPeer.expectLinkFlow(false, false, 
equalTo(UnsignedInteger.valueOf(messageCount)));
-
-            session.rollback();
-
-            testPeer.waitForAllHandlersToComplete(1000);
-        }
-    }
-
-    @Test(timeout=20000)
-    public void 
testDefaultOutcomeIsModifiedForConsumerSourceOnTransactedSession() throws 
Exception {
-        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            Connection connection = testFixture.establishConnecton(testPeer);
-            connection.start();
-
-            testPeer.expectBegin();
-            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
-            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
-
-            // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
-            // reply with a declared disposition state containing the txnId.
-            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 
7, (byte) 8});
-            TransferPayloadCompositeMatcher declareMatcher = new 
TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
-
-            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
-            String queueName = "myQueue";
-            Queue queue = session.createQueue(queueName);
-
-            SourceMatcher sourceMatcher = new SourceMatcher();
-            sourceMatcher.withAddress(equalTo(queueName));
-            sourceMatcher.withDynamic(equalTo(false));
-            
sourceMatcher.withOutcomes(arrayContaining(Accepted.DESCRIPTOR_SYMBOL, 
Rejected.DESCRIPTOR_SYMBOL, Released.DESCRIPTOR_SYMBOL, 
Modified.DESCRIPTOR_SYMBOL));
-            ModifiedMatcher outcomeMatcher = new 
ModifiedMatcher().withDeliveryFailed(equalTo(true)).withUndeliverableHere(nullValue());
-            sourceMatcher.withDefaultOutcome(outcomeMatcher);
-
-            testPeer.expectReceiverAttach(notNullValue(), sourceMatcher);
-            testPeer.expectLinkFlow();
-
-            session.createConsumer(queue);
-
-            testPeer.waitForAllHandlersToComplete(1000);
-        }
-    }
 
     @Test(timeout=20000)
     public void testPrefetchPolicyInfluencesCreditFlow() throws Exception {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0fd981ca/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
new file mode 100644
index 0000000..947700b
--- /dev/null
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
@@ -0,0 +1,720 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.hamcrest.Matchers.arrayContaining;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.TransactionRolledBackException;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Declare;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Declared;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Discharge;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Error;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Modified;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Released;
+import org.apache.qpid.jms.test.testpeer.describedtypes.TransactionalState;
+import 
org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.ReleasedMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.TransactionalStateMatcher;
+import 
org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
+import 
org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
+import 
org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import 
org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.junit.Test;
+
+/**
+ * Tests for behavior of Transacted Session operations.
+ */
+public class TransactionsIntegrationTest extends QpidJmsTestCase {
+
+    private final IntegrationTestFixture testFixture = new 
IntegrationTestFixture();
+
+    @Test(timeout=20000)
+    public void testTransactionCommitFailWithEmptyRejectedDisposition() throws 
Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+            // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a Declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 
7, (byte) 8});
+            TransferPayloadCompositeMatcher declareMatcher = new 
TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
+
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+
+            // Create a producer to use in provoking creation of the AMQP 
transaction
+            testPeer.expectSenderAttach();
+            MessageProducer producer  = session.createProducer(queue);
+
+            // Expect the message which was sent under the current 
transaction. Check it carries
+            // TransactionalState with the above txnId but has no outcome. 
Respond with a
+            // TransactionalState with Accepted outcome.
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(new 
MessageHeaderSectionMatcher(true));
+            messageMatcher.setMessageAnnotationsMatcher(new 
MessageAnnotationsSectionMatcher(true));
+
+            TransactionalStateMatcher stateMatcher = new 
TransactionalStateMatcher();
+            stateMatcher.withTxnId(equalTo(txnId));
+            stateMatcher.withOutcome(nullValue());
+
+            TransactionalState txState = new TransactionalState();
+            txState.setTxnId(txnId);
+            txState.setOutcome(new Accepted());
+
+            testPeer.expectTransfer(messageMatcher, stateMatcher, false, 
txState, true);
+
+            producer.send(session.createMessage());
+
+            // Expect an unsettled 'discharge' transfer to the txn coordinator 
containing the txnId,
+            // and reply with rejected and settled disposition to indicate the 
commit failed
+            Discharge discharge = new Discharge();
+            discharge.setFail(false);
+            discharge.setTxnId(txnId);
+            TransferPayloadCompositeMatcher dischargeMatcher = new 
TransferPayloadCompositeMatcher();
+            dischargeMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(discharge));
+            testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new 
Rejected(), true);
+
+            // Then expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, 
(byte) 4});
+            declareMatcher = new TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
+
+            try {
+                session.commit();
+                fail("Commit operation should have failed.");
+            } catch (TransactionRolledBackException jmsTxRb) {
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testProducedMessagesAfterCommitOfSentMessagesFails() throws 
Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+            // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a Declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 
7, (byte) 8});
+            TransferPayloadCompositeMatcher declareMatcher = new 
TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
+
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+
+            // Create a producer to use in provoking creation of the AMQP 
transaction
+            testPeer.expectSenderAttach();
+            MessageProducer producer  = session.createProducer(queue);
+
+            // Expect the message which was sent under the current 
transaction. Check it carries
+            // TransactionalState with the above txnId but has no outcome. 
Respond with a
+            // TransactionalState with Accepted outcome.
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(new 
MessageHeaderSectionMatcher(true));
+            messageMatcher.setMessageAnnotationsMatcher(new 
MessageAnnotationsSectionMatcher(true));
+
+            TransactionalStateMatcher stateMatcher = new 
TransactionalStateMatcher();
+            stateMatcher.withTxnId(equalTo(txnId));
+            stateMatcher.withOutcome(nullValue());
+
+            TransactionalState txState = new TransactionalState();
+            txState.setTxnId(txnId);
+            txState.setOutcome(new Accepted());
+
+            testPeer.expectTransfer(messageMatcher, stateMatcher, false, 
txState, true);
+
+            producer.send(session.createMessage());
+
+            // Expect an unsettled 'discharge' transfer to the txn coordinator 
containing the txnId,
+            // and reply with rejected and settled disposition to indicate the 
commit failed
+            Discharge discharge = new Discharge();
+            discharge.setFail(false);
+            discharge.setTxnId(txnId);
+            TransferPayloadCompositeMatcher dischargeMatcher = new 
TransferPayloadCompositeMatcher();
+            dischargeMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(discharge));
+            Rejected commitFailure = new Rejected(new 
Error(Symbol.valueOf("failed"), "Unknown error"));
+            testPeer.expectTransfer(dischargeMatcher, nullValue(), false, 
commitFailure, true);
+
+            // Then expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, 
(byte) 4});
+            declareMatcher = new TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
+
+            try {
+                session.commit();
+                fail("Commit operation should have failed.");
+            } catch (TransactionRolledBackException jmsTxRb) {
+            }
+
+            // Expect the message which was sent under the current 
transaction. Check it carries
+            // TransactionalState with the above txnId but has no outcome. 
Respond with a
+            // TransactionalState with Accepted outcome.
+            stateMatcher = new TransactionalStateMatcher();
+            stateMatcher.withTxnId(equalTo(txnId));
+            stateMatcher.withOutcome(nullValue());
+
+            txState = new TransactionalState();
+            txState.setTxnId(txnId);
+            txState.setOutcome(new Accepted());
+
+            testPeer.expectTransfer(messageMatcher, stateMatcher, false, 
txState, true);
+
+            producer.send(session.createMessage());
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testProducedMessagesAfterRollbackSentMessagesFails() throws 
Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+            // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a Declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 
7, (byte) 8});
+            TransferPayloadCompositeMatcher declareMatcher = new 
TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
+
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+
+            // Create a producer to use in provoking creation of the AMQP 
transaction
+            testPeer.expectSenderAttach();
+            MessageProducer producer  = session.createProducer(queue);
+
+            // Expect the message which was sent under the current 
transaction. Check it carries
+            // TransactionalState with the above txnId but has no outcome. 
Respond with a
+            // TransactionalState with Accepted outcome.
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(new 
MessageHeaderSectionMatcher(true));
+            messageMatcher.setMessageAnnotationsMatcher(new 
MessageAnnotationsSectionMatcher(true));
+
+            TransactionalStateMatcher stateMatcher = new 
TransactionalStateMatcher();
+            stateMatcher.withTxnId(equalTo(txnId));
+            stateMatcher.withOutcome(nullValue());
+
+            TransactionalState txState = new TransactionalState();
+            txState.setTxnId(txnId);
+            txState.setOutcome(new Accepted());
+
+            testPeer.expectTransfer(messageMatcher, stateMatcher, false, 
txState, true);
+
+            producer.send(session.createMessage());
+
+            // Expect an unsettled 'discharge' transfer to the txn coordinator 
containing the txnId,
+            // and reply with rejected and settled disposition to indicate the 
rollback failed
+            Discharge discharge = new Discharge();
+            discharge.setFail(true);
+            discharge.setTxnId(txnId);
+            TransferPayloadCompositeMatcher dischargeMatcher = new 
TransferPayloadCompositeMatcher();
+            dischargeMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(discharge));
+            Rejected commitFailure = new Rejected(new 
Error(Symbol.valueOf("failed"), "Unknown error"));
+            testPeer.expectTransfer(dischargeMatcher, nullValue(), false, 
commitFailure, true);
+
+            // Then expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, 
(byte) 4});
+            declareMatcher = new TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
+
+            try {
+                session.rollback();
+                fail("Rollback operation should have failed.");
+            } catch (JMSException jmsex) {
+            }
+
+            // Expect the message which was sent under the current 
transaction. Check it carries
+            // TransactionalState with the above txnId but has no outcome. 
Respond with a
+            // TransactionalState with Accepted outcome.
+            stateMatcher = new TransactionalStateMatcher();
+            stateMatcher.withTxnId(equalTo(txnId));
+            stateMatcher.withOutcome(nullValue());
+
+            txState = new TransactionalState();
+            txState.setTxnId(txnId);
+            txState.setOutcome(new Accepted());
+
+            testPeer.expectTransfer(messageMatcher, stateMatcher, false, 
txState, true);
+
+            producer.send(session.createMessage());
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testCommitTransactedSessionWithConsumerReceivingAllMessages() 
throws Exception {
+        doCommitTransactedSessionWithConsumerTestImpl(1, 1);
+    }
+
+    @Test(timeout=20000)
+    public void testCommitTransactedSessionWithConsumerReceivingSomeMessages() 
throws Exception {
+        doCommitTransactedSessionWithConsumerTestImpl(5, 2);
+    }
+
+    private void doCommitTransactedSessionWithConsumerTestImpl(int 
transferCount, int consumeCount) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+            // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 
3, (byte) 4});
+            TransferPayloadCompositeMatcher declareMatcher = new 
TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
+
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, 
new AmqpValueDescribedType("content"), transferCount);
+
+            for (int i = 1; i <= consumeCount; i++) {
+                // Then expect an *settled* TransactionalState disposition for 
each message once received by the consumer
+                TransactionalStateMatcher stateMatcher = new 
TransactionalStateMatcher();
+                stateMatcher.withTxnId(equalTo(txnId));
+                stateMatcher.withOutcome(new AcceptedMatcher());
+
+                testPeer.expectDisposition(true, stateMatcher);
+            }
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+
+            for (int i = 1; i <= consumeCount; i++) {
+                Message receivedMessage = messageConsumer.receive(3000);
+
+                assertNotNull(receivedMessage);
+                assertTrue(receivedMessage instanceof TextMessage);
+            }
+
+            // Expect an unsettled 'discharge' transfer to the txn coordinator 
containing the txnId,
+            // and reply with accepted and settled disposition to indicate the 
commit succeeded
+            Discharge discharge = new Discharge();
+            discharge.setFail(false);
+            discharge.setTxnId(txnId);
+            TransferPayloadCompositeMatcher dischargeMatcher = new 
TransferPayloadCompositeMatcher();
+            dischargeMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(discharge));
+            testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new 
Accepted(), true);
+
+            // Then expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, 
(byte) 4});
+            declareMatcher = new TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
+
+            session.commit();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testProducedMessagesOnTransactedSessionCarryTxnId() throws 
Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+            // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a Declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 
7, (byte) 8});
+            TransferPayloadCompositeMatcher declareMatcher = new 
TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
+
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+
+            // Create a producer to use in provoking creation of the AMQP 
transaction
+            testPeer.expectSenderAttach();
+            MessageProducer producer  = session.createProducer(queue);
+
+            // Expect the message which was sent under the current 
transaction. Check it carries
+            // TransactionalState with the above txnId but has no outcome. 
Respond with a
+            // TransactionalState with Accepted outcome.
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(new 
MessageHeaderSectionMatcher(true));
+            messageMatcher.setMessageAnnotationsMatcher(new 
MessageAnnotationsSectionMatcher(true));
+
+            TransactionalStateMatcher stateMatcher = new 
TransactionalStateMatcher();
+            stateMatcher.withTxnId(equalTo(txnId));
+            stateMatcher.withOutcome(nullValue());
+
+            TransactionalState txState = new TransactionalState();
+            txState.setTxnId(txnId);
+            txState.setOutcome(new Accepted());
+
+            testPeer.expectTransfer(messageMatcher, stateMatcher, false, 
txState, true);
+
+            producer.send(session.createMessage());
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void 
testRollbackTransactedSessionWithConsumerReceivingAllMessages() throws 
Exception {
+        doRollbackTransactedSessionWithConsumerTestImpl(1, 1);
+    }
+
+    @Test(timeout=20000)
+    public void 
testRollbackTransactedSessionWithConsumerReceivingSomeMessages() throws 
Exception {
+        doRollbackTransactedSessionWithConsumerTestImpl(5, 2);
+    }
+
+    private void doRollbackTransactedSessionWithConsumerTestImpl(int 
transferCount, int consumeCount) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+            // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 
7, (byte) 8});
+            TransferPayloadCompositeMatcher declareMatcher = new 
TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
+
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, 
new AmqpValueDescribedType("content"), transferCount);
+
+            for (int i = 1; i <= consumeCount; i++) {
+                // Then expect a *settled* TransactionalState disposition for 
each message once received by the consumer
+                TransactionalStateMatcher stateMatcher = new 
TransactionalStateMatcher();
+                stateMatcher.withTxnId(equalTo(txnId));
+                stateMatcher.withOutcome(new AcceptedMatcher());
+
+                testPeer.expectDisposition(true, stateMatcher);
+            }
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+
+            for (int i = 1; i <= consumeCount; i++) {
+                Message receivedMessage = messageConsumer.receive(3000);
+
+                assertNotNull(receivedMessage);
+                assertTrue(receivedMessage instanceof TextMessage);
+            }
+
+            // Expect the consumer to be 'stopped' prior to rollback by 
issuing a 'drain'
+            testPeer.expectLinkFlow(true, true, 
greaterThan(UnsignedInteger.ZERO));
+
+            // Expect an unsettled 'discharge' transfer to the txn coordinator 
containing the txnId,
+            // and reply with accepted and settled disposition to indicate the 
rollback succeeded
+            Discharge discharge = new Discharge();
+            discharge.setFail(true);
+            discharge.setTxnId(txnId);
+            TransferPayloadCompositeMatcher dischargeMatcher = new 
TransferPayloadCompositeMatcher();
+            dischargeMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(discharge));
+            testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new 
Accepted(), true);
+
+            // Then expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, 
(byte) 8});
+            declareMatcher = new TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
+
+            // Expect the messages that were not consumed to be released
+            int unconsumed = transferCount - consumeCount;
+            for (int i = 1; i <= unconsumed; i++) {
+                testPeer.expectDisposition(true, new ReleasedMatcher());
+            }
+
+            // Expect the consumer to be 'started' again as rollback completes
+            testPeer.expectLinkFlow(false, false, 
greaterThan(UnsignedInteger.ZERO));
+
+            session.rollback();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void 
testRollbackTransactedSessionWithPrefetchFullBeforeStoppingConsumer() throws 
Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            int messageCount = 5;
+            ((JmsConnection) 
connection).getPrefetchPolicy().setAll(messageCount);
+            connection.start();
+
+            testPeer.expectBegin();
+            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+            // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 
7, (byte) 8});
+            TransferPayloadCompositeMatcher declareMatcher = new 
TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
+
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+
+            // Create a consumer and fill the prefetch with messages, which we 
wont consume any of
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, 
new AmqpValueDescribedType("content"), messageCount);
+
+            session.createConsumer(queue);
+
+            // Create a producer to use in provoking creation of the AMQP 
transaction
+            testPeer.expectSenderAttach();
+            MessageProducer producer  = session.createProducer(queue);
+
+            // Expect the message which provoked creating the transaction
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(new 
MessageHeaderSectionMatcher(true));
+            messageMatcher.setMessageAnnotationsMatcher( new 
MessageAnnotationsSectionMatcher(true));
+            TransactionalStateMatcher stateMatcher = new 
TransactionalStateMatcher();
+            stateMatcher.withTxnId(equalTo(txnId));
+            stateMatcher.withOutcome(nullValue());
+
+            TransactionalState txState = new TransactionalState();
+            txState.setTxnId(txnId);
+            txState.setOutcome(new Accepted());
+
+            testPeer.expectTransfer(messageMatcher, stateMatcher, false, 
txState, true);
+
+            producer.send(session.createMessage());
+
+            // The consumer will be 'stopped' prior to rollback, however we 
will NOT send a 'drain' Flow
+            // frame as we have manipulated that all the credit was already 
used, i.e. it already stopped.
+
+            // Expect an unsettled 'discharge' transfer to the txn coordinator 
containing the txnId,
+            // and reply with accepted and settled disposition to indicate the 
rollback succeeded
+            Discharge discharge = new Discharge();
+            discharge.setFail(true);
+            discharge.setTxnId(txnId);
+            TransferPayloadCompositeMatcher dischargeMatcher = new 
TransferPayloadCompositeMatcher();
+            dischargeMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(discharge));
+            testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new 
Accepted(), true);
+
+            // Now expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, 
(byte) 8});
+            declareMatcher = new TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
+
+            // Expect the messages that were not consumed to be released
+            for (int i = 1; i <= messageCount; i++) {
+                testPeer.expectDisposition(true, new ReleasedMatcher());
+            }
+
+            // Expect the consumer to be 'started' again as rollback completes
+            testPeer.expectLinkFlow(false, false, 
equalTo(UnsignedInteger.valueOf(messageCount)));
+
+            session.rollback();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void 
testRollbackTransactedSessionWithPrefetchFullyUtilisedByDrainWhenStoppingConsumer()
 throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            int messageCount = 5;
+            ((JmsConnection) 
connection).getPrefetchPolicy().setAll(messageCount);
+            connection.start();
+
+            testPeer.expectBegin();
+            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+            // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 
7, (byte) 8});
+            TransferPayloadCompositeMatcher declareMatcher = new 
TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
+
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+
+            // Create a consumer, expect it to flow credit, but don't send it 
any messages
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlow(false, false, 
equalTo(UnsignedInteger.valueOf(messageCount)));
+
+            session.createConsumer(queue);
+
+            // Create a producer to use in provoking creation of the AMQP 
transaction
+            testPeer.expectSenderAttach();
+            MessageProducer producer  = session.createProducer(queue);
+
+            // Expect the message which provoked creating the transaction
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(new 
MessageHeaderSectionMatcher(true));
+            messageMatcher.setMessageAnnotationsMatcher( new 
MessageAnnotationsSectionMatcher(true));
+
+            TransactionalStateMatcher stateMatcher = new 
TransactionalStateMatcher();
+            stateMatcher.withTxnId(equalTo(txnId));
+            stateMatcher.withOutcome(nullValue());
+
+            TransactionalState txState = new TransactionalState();
+            txState.setTxnId(txnId);
+            txState.setOutcome(new Accepted());
+
+            testPeer.expectTransfer(messageMatcher, stateMatcher, false, 
txState, true);
+
+            producer.send(session.createMessage());
+
+            // Expect the consumer to be 'stopped' prior to rollback by 
issuing a 'drain' Flow.
+            // Action the drain by filling the prefetch (which is equivalent 
to this having happened while
+            // the Flow was in flight to the peer), and then DONT send a flow 
frame back to the client
+            // as it can tell from the messages that all the credit has been 
used.
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, 
new AmqpValueDescribedType("content"),
+                                                       messageCount, true, 
false, equalTo(UnsignedInteger.valueOf(messageCount)), 1, false);
+
+            // Expect an unsettled 'discharge' transfer to the txn coordinator 
containing the txnId,
+            // and reply with accepted and settled disposition to indicate the 
rollback succeeded
+            Discharge discharge = new Discharge();
+            discharge.setFail(true);
+            discharge.setTxnId(txnId);
+            TransferPayloadCompositeMatcher dischargeMatcher = new 
TransferPayloadCompositeMatcher();
+            dischargeMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(discharge));
+            testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new 
Accepted(), true);
+
+            // Then expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, 
(byte) 8});
+            declareMatcher = new TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
+
+            // Expect the messages that were not consumed to be released
+            for (int i = 1; i <= messageCount; i++) {
+                testPeer.expectDisposition(true, new ReleasedMatcher());
+            }
+
+            // Expect the consumer to be 'started' again as rollback completes
+            testPeer.expectLinkFlow(false, false, 
equalTo(UnsignedInteger.valueOf(messageCount)));
+
+            session.rollback();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void 
testDefaultOutcomeIsModifiedForConsumerSourceOnTransactedSession() throws 
Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+            // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 
7, (byte) 8});
+            TransferPayloadCompositeMatcher declareMatcher = new 
TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, nullValue(), false, new 
Declared().setTxnId(txnId), true);
+
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+
+            SourceMatcher sourceMatcher = new SourceMatcher();
+            sourceMatcher.withAddress(equalTo(queueName));
+            sourceMatcher.withDynamic(equalTo(false));
+            
sourceMatcher.withOutcomes(arrayContaining(Accepted.DESCRIPTOR_SYMBOL, 
Rejected.DESCRIPTOR_SYMBOL, Released.DESCRIPTOR_SYMBOL, 
Modified.DESCRIPTOR_SYMBOL));
+            ModifiedMatcher outcomeMatcher = new 
ModifiedMatcher().withDeliveryFailed(equalTo(true)).withUndeliverableHere(nullValue());
+            sourceMatcher.withDefaultOutcome(outcomeMatcher);
+
+            testPeer.expectReceiverAttach(notNullValue(), sourceMatcher);
+            testPeer.expectLinkFlow();
+
+            session.createConsumer(queue);
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to