Repository: qpid-jms
Updated Branches:
  refs/heads/master 590b3c65d -> 0fe6b0471


QPIDJMS-175 Add a test around drain timeout on transaction rollback with
an initial change to allow the test to pass.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/1b05e92c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/1b05e92c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/1b05e92c

Branch: refs/heads/master
Commit: 1b05e92c2465620af07273c83cb0e15ee8833b8f
Parents: 590b3c6
Author: Timothy Bish <[email protected]>
Authored: Tue May 10 17:29:07 2016 -0400
Committer: Timothy Bish <[email protected]>
Committed: Tue May 10 17:29:07 2016 -0400

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsSession.java    | 12 ++--
 .../TransactionsIntegrationTest.java            | 64 ++++++++++++++++++++
 2 files changed, 72 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1b05e92c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 1c8f2d8..ea57503 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -192,12 +192,16 @@ public class JmsSession implements AutoCloseable, 
Session, QueueSession, TopicSe
         }
 
         // Stop processing any new messages that arrive
-        for (JmsMessageConsumer c : consumers.values()) {
-            c.suspendForRollback();
+        try {
+            for (JmsMessageConsumer c : consumers.values()) {
+                c.suspendForRollback();
+            }
+        } finally {
+            transactionContext.rollback();
         }
 
-        transactionContext.rollback();
-
+        // Currently some consumers won't get suspended and some won't restart
+        // after a failed rollback.
         for (JmsMessageConsumer c : consumers.values()) {
             c.resumeAfterRollback();
         }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1b05e92c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
index dfd2f1c..2ce34fd 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
@@ -1211,4 +1211,68 @@ public class TransactionsIntegrationTest extends 
QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
+
+    @Test(timeout = 20000)
+    public void testRollbackWithNoResponseForSuspendConsumer() throws 
Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, 
"?amqp.drainTimeout=1000");
+            connection.start();
+
+            testPeer.expectBegin();
+            testPeer.expectCoordinatorAttach();
+
+            // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 
7, (byte) 8});
+            testPeer.expectDeclare(txnId);
+
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, 
new AmqpValueDescribedType("content"), 2);
+
+            // Then expect a *settled* TransactionalState disposition for the 
message once received by the consumer
+            TransactionalStateMatcher stateMatcher = new 
TransactionalStateMatcher();
+            stateMatcher.withTxnId(equalTo(txnId));
+            stateMatcher.withOutcome(new AcceptedMatcher());
+
+            testPeer.expectDisposition(true, stateMatcher);
+
+            // Read one so we try to suspend on rollback
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            Message receivedMessage = messageConsumer.receive(3000);
+
+            assertNotNull(receivedMessage);
+            assertTrue(receivedMessage instanceof TextMessage);
+
+            // Expect the consumer to be 'stopped' prior to rollback by 
issuing a 'drain'
+            testPeer.expectLinkFlow(true, false, 
greaterThan(UnsignedInteger.ZERO));
+
+            // Expect the consumer to be closed after drain timeout
+            testPeer.expectDetach(true, true, true);
+
+            // Expect an unsettled 'discharge' transfer to the txn coordinator 
containing the txnId,
+            // and reply with accepted and settled disposition to indicate the 
rollback succeeded
+            testPeer.expectDischarge(txnId, true);
+
+            // Then expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, 
(byte) 8});
+            testPeer.expectDeclare(txnId);
+            testPeer.expectDischarge(txnId, true);
+            testPeer.expectClose();
+
+            try {
+                session.rollback();
+                //fail("Consumer should have failed to stop and caused an 
error on rollback.");
+            } catch (JMSException ex) {
+                // Expected
+            }
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to