Repository: qpid-jms Updated Branches: refs/heads/master 590b3c65d -> 0fe6b0471
QPIDJMS-175 Add a test around drain timeout on transaction rollback with an initial change to allow the test to pass. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/1b05e92c Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/1b05e92c Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/1b05e92c Branch: refs/heads/master Commit: 1b05e92c2465620af07273c83cb0e15ee8833b8f Parents: 590b3c6 Author: Timothy Bish <[email protected]> Authored: Tue May 10 17:29:07 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Tue May 10 17:29:07 2016 -0400 ---------------------------------------------------------------------- .../java/org/apache/qpid/jms/JmsSession.java | 12 ++-- .../TransactionsIntegrationTest.java | 64 ++++++++++++++++++++ 2 files changed, 72 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1b05e92c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java index 1c8f2d8..ea57503 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java @@ -192,12 +192,16 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe } // Stop processing any new messages that arrive - for (JmsMessageConsumer c : consumers.values()) { - c.suspendForRollback(); + try { + for (JmsMessageConsumer c : consumers.values()) { + c.suspendForRollback(); + } + } finally { + transactionContext.rollback(); } - transactionContext.rollback(); - + // Currently some consumers won't get suspended and some won't restart + // after a failed rollback. for (JmsMessageConsumer c : consumers.values()) { c.resumeAfterRollback(); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1b05e92c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java index dfd2f1c..2ce34fd 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java @@ -1211,4 +1211,68 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase { testPeer.waitForAllHandlersToComplete(1000); } } + + @Test(timeout = 20000) + public void testRollbackWithNoResponseForSuspendConsumer() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?amqp.drainTimeout=1000"); + connection.start(); + + testPeer.expectBegin(); + testPeer.expectCoordinatorAttach(); + + // First expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + testPeer.expectDeclare(txnId); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue("myQueue"); + + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 2); + + // Then expect a *settled* TransactionalState disposition for the message once received by the consumer + TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher(); + stateMatcher.withTxnId(equalTo(txnId)); + stateMatcher.withOutcome(new AcceptedMatcher()); + + testPeer.expectDisposition(true, stateMatcher); + + // Read one so we try to suspend on rollback + MessageConsumer messageConsumer = session.createConsumer(queue); + Message receivedMessage = messageConsumer.receive(3000); + + assertNotNull(receivedMessage); + assertTrue(receivedMessage instanceof TextMessage); + + // Expect the consumer to be 'stopped' prior to rollback by issuing a 'drain' + testPeer.expectLinkFlow(true, false, greaterThan(UnsignedInteger.ZERO)); + + // Expect the consumer to be closed after drain timeout + testPeer.expectDetach(true, true, true); + + // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, + // and reply with accepted and settled disposition to indicate the rollback succeeded + testPeer.expectDischarge(txnId, true); + + // Then expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + testPeer.expectDeclare(txnId); + testPeer.expectDischarge(txnId, true); + testPeer.expectClose(); + + try { + session.rollback(); + //fail("Consumer should have failed to stop and caused an error on rollback."); + } catch (JMSException ex) { + // Expected + } + + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
