Repository: qpid-jms Updated Branches: refs/heads/master 0fd981ca1 -> 845f75db6
QPIDJMS-125 Remove extra call to TX rollback on session close and add some tests in failover for tx recreate on reconnect. Fix some issues in the failover integration test. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/845f75db Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/845f75db Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/845f75db Branch: refs/heads/master Commit: 845f75db6ca3c2a67274aafff5cfc65b465705c6 Parents: 0fd981c Author: Timothy Bish <[email protected]> Authored: Tue Oct 13 18:18:53 2015 -0400 Committer: Timothy Bish <[email protected]> Committed: Tue Oct 13 18:18:53 2015 -0400 ---------------------------------------------------------------------- .../java/org/apache/qpid/jms/JmsSession.java | 4 - .../failover/FailoverIntegrationTest.java | 130 ++++++++++++++++++- 2 files changed, 128 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/845f75db/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java index eee84b8..c0a361b 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java @@ -229,10 +229,6 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa protected void doClose() throws JMSException { boolean interrupted = Thread.interrupted(); shutdown(); - try { - transactionContext.rollback(); - } catch (JMSException e) { - } connection.removeSession(sessionInfo); connection.destroyResource(sessionInfo); if (interrupted) { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/845f75db/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java index a52cc79..f5063c9 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java @@ -54,12 +54,18 @@ import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError; import org.apache.qpid.jms.test.testpeer.basictypes.TerminusDurability; import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted; +import org.apache.qpid.jms.test.testpeer.describedtypes.Declare; +import org.apache.qpid.jms.test.testpeer.describedtypes.Declared; +import org.apache.qpid.jms.test.testpeer.describedtypes.Discharge; import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType; +import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher; import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher; import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher; import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher; import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher; +import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher; import org.apache.qpid.jms.util.StopWatch; +import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.UnsignedInteger; import org.junit.Test; @@ -86,6 +92,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { fail("Should have thrown JMSSecurityException: " + ex); } + connection.close(); + testPeer.waitForAllHandlersToComplete(1000); } } @@ -148,7 +156,7 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS)); - //Shut it down + // Shut it down finalPeer.expectClose(); connection.close(); finalPeer.waitForAllHandlersToComplete(1000); @@ -258,7 +266,7 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { Throwable t = problem.get(); assertTrue("Sender thread should have completed. Problem: " + t, await); - //Shut it down + // Shut it down finalPeer.expectClose(); connection.close(); finalPeer.waitForAllHandlersToComplete(1000); @@ -320,6 +328,10 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS)); + // Shut it down + finalPeer.expectClose(); + connection.close(); + finalPeer.waitForAllHandlersToComplete(1000); } } @@ -348,6 +360,12 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { String message = "Initial connect should not have delayed for the specified initialReconnectDelay." + "Elapsed=" + taken + ", delay=" + delay; assertTrue(message, taken < delay); assertTrue("Connection took longer than reasonable: " + taken, taken < 5000); + + // Shut it down + originalPeer.expectClose(); + connection.close(); + + originalPeer.waitForAllHandlersToComplete(2000); } } @@ -416,6 +434,10 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { consumer.close(); + // Shut it down + finalPeer.expectClose(); + connection.close(); + finalPeer.waitForAllHandlersToComplete(1000); } } @@ -488,6 +510,10 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { LOG.info("Closing consumer"); consumer.close(); + // Shut it down + finalPeer.expectClose(); + connection.close(); + finalPeer.waitForAllHandlersToComplete(1000); } } @@ -562,6 +588,10 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { LOG.info("Closing consumer"); consumer.close(); + // Shut it down + finalPeer.expectClose(); + connection.close(); + finalPeer.waitForAllHandlersToComplete(1000); } } @@ -633,6 +663,12 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { assertFalse(queueView.hasMoreElements()); browser.close(); + + // Shut it down + finalPeer.expectClose(); + connection.close(); + + finalPeer.waitForAllHandlersToComplete(1000); } } @@ -678,10 +714,100 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { LOG.info("Test caught expected error: {}", ide.getMessage()); } + // Shut it down + testPeer.expectClose(); + connection.close(); + testPeer.waitForAllHandlersToComplete(1000); } } + @Test(timeout=20000) + public void testTxRecreatedAfterConnectionFailsOver() throws Exception { + try (TestAmqpPeer originalPeer = new TestAmqpPeer(); + TestAmqpPeer finalPeer = new TestAmqpPeer();) { + + final CountDownLatch originalConnected = new CountDownLatch(1); + final CountDownLatch finalConnected = new CountDownLatch(1); + + // Create a peer to connect to, then one to reconnect to + final String originalURI = createPeerURI(originalPeer); + final String finalURI = createPeerURI(finalPeer); + + LOG.info("Original peer is at: {}", originalURI); + LOG.info("Final peer is at: {}", finalURI); + + originalPeer.expectSaslAnonymousConnect(); + originalPeer.expectBegin(); + + final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); + connection.addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onConnectionEstablished(URI remoteURI) { + LOG.info("Connection Established: {}", remoteURI); + if (originalURI.equals(remoteURI.toString())) { + originalConnected.countDown(); + } + } + + @Override + public void onConnectionRestored(URI remoteURI) { + LOG.info("Connection Restored: {}", remoteURI); + if (finalURI.equals(remoteURI.toString())) { + finalConnected.countDown(); + } + } + }); + connection.start(); + + assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS)); + + originalPeer.expectBegin(); + CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); + originalPeer.expectSenderAttach(txCoordinatorMatcher, false, false); + + // First expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a Declared disposition state containing the txnId. + Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + originalPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + + originalPeer.dropAfterLastHandler(); + + // --- Post Failover Expectations of FinalPeer --- // + + finalPeer.expectSaslAnonymousConnect(); + finalPeer.expectBegin(); + finalPeer.expectBegin(); + finalPeer.expectSenderAttach(txCoordinatorMatcher, false, false); + finalPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + + // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, + // and reply with accepted and settled disposition to indicate the rollback succeeded. + Discharge discharge = new Discharge(); + discharge.setFail(true); + discharge.setTxnId(txnId); + TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher(); + dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge)); + finalPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true); + finalPeer.expectEnd(); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS)); + + session.close(); + + // Shut it down + finalPeer.expectClose(); + connection.close(); + + originalPeer.waitForAllHandlersToComplete(2000); + finalPeer.waitForAllHandlersToComplete(1000); + } + } + private JmsConnection establishAnonymousConnecton(TestAmqpPeer... peers) throws JMSException { return establishAnonymousConnecton(null, peers); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
