Repository: qpid-jms Updated Branches: refs/heads/master 7226605a7 -> e676248c7
QPIDJMS-394 Fix failover wrongly signalling async completion Ensure that an inflight async completion that is held in the Failover provider is not signaled as failed by the session during connection recovery. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/e676248c Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/e676248c Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/e676248c Branch: refs/heads/master Commit: e676248c74e9747031e5a89a640442bb51ffcb28 Parents: 7226605 Author: Timothy Bish <tabish...@gmail.com> Authored: Tue Jun 12 15:24:32 2018 -0400 Committer: Timothy Bish <tabish...@gmail.com> Committed: Tue Jun 12 15:24:32 2018 -0400 ---------------------------------------------------------------------- .../java/org/apache/qpid/jms/JmsConnection.java | 4 -- .../qpid/jms/JmsLocalTransactionContext.java | 12 +++- .../qpid/jms/JmsNoTxTransactionContext.java | 5 +- .../java/org/apache/qpid/jms/JmsSession.java | 31 +++++---- .../apache/qpid/jms/JmsTransactionContext.java | 5 +- .../failover/FailoverIntegrationTest.java | 68 ++++++++++++++++++++ 6 files changed, 101 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e676248c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java index 9f828ab..338ad5d 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java @@ -760,10 +760,6 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection } } - void send(JmsOutboundMessageDispatch envelope) throws JMSException { - send(envelope, null); - } - void send(JmsOutboundMessageDispatch envelope, ProviderSynchronization synchronization) throws JMSException { checkClosedOrFailed(); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e676248c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java index bf13888..c60c5e6 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java @@ -56,10 +56,14 @@ public class JmsLocalTransactionContext implements JmsTransactionContext { } @Override - public void send(JmsConnection connection, final JmsOutboundMessageDispatch envelope) throws JMSException { + public void send(JmsConnection connection, final JmsOutboundMessageDispatch envelope, ProviderSynchronization outcome) throws JMSException { lock.readLock().lock(); try { if (isInDoubt()) { + // Need to signal that the request is going to pass before completing + if (outcome != null) { + outcome.onPendingSuccess(); + } // Ensure that asynchronous completions get signaled while TX is in doubt if (envelope.isCompletionRequired()) { connection.onCompletedMessageSend(envelope); @@ -74,12 +78,18 @@ public class JmsLocalTransactionContext implements JmsTransactionContext { public void onPendingSuccess() { LOG.trace("TX:{} has performed a send.", getTransactionId()); participants.put(envelope.getProducerId(), envelope.getProducerId()); + if (outcome != null) { + outcome.onPendingSuccess(); + } } @Override public void onPendingFailure(Throwable cause) { LOG.trace("TX:{} has a failed send.", getTransactionId()); participants.put(envelope.getProducerId(), envelope.getProducerId()); + if (outcome != null) { + outcome.onPendingFailure(cause); + } } }); } finally { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e676248c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java index bb3e421..360988f 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java @@ -24,6 +24,7 @@ import org.apache.qpid.jms.meta.JmsResourceId; import org.apache.qpid.jms.meta.JmsTransactionId; import org.apache.qpid.jms.provider.Provider; import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; +import org.apache.qpid.jms.provider.ProviderSynchronization; /** * Used in non-transacted JMS Sessions to throw proper errors indicating @@ -32,8 +33,8 @@ import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; public class JmsNoTxTransactionContext implements JmsTransactionContext { @Override - public void send(JmsConnection connection, JmsOutboundMessageDispatch envelope) throws JMSException { - connection.send(envelope); + public void send(JmsConnection connection, JmsOutboundMessageDispatch envelope, ProviderSynchronization outcome) throws JMSException { + connection.send(envelope, outcome); } @Override http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e676248c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java index b8c88f6..9711728 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java @@ -92,6 +92,7 @@ import org.apache.qpid.jms.policy.JmsRedeliveryPolicy; import org.apache.qpid.jms.provider.Provider; import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; import org.apache.qpid.jms.provider.ProviderFuture; +import org.apache.qpid.jms.provider.ProviderSynchronization; import org.apache.qpid.jms.selector.SelectorParser; import org.apache.qpid.jms.selector.filter.FilterException; import org.apache.qpid.jms.util.NoOpExecutor; @@ -889,26 +890,24 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe outbound.onSendComplete(); } - SendCompletion completion = null; if (envelope.isCompletionRequired()) { - completion = new SendCompletion(envelope, listener); - asyncSendQueue.addLast(completion); - } + transactionContext.send(connection, envelope, new ProviderSynchronization() { - try { - transactionContext.send(connection, envelope); - } catch (JMSException jmsEx) { - // If the synchronous portion of the send fails the completion be - // notified but might depending on the circumstances of the failures, - // remove it from the queue and check if is is already completed. - if (completion != null) { - asyncSendQueue.remove(completion); - if (completion.hasCompleted()) { - return; + @Override + public void onPendingSuccess() { + // Provider accepted the send request so new we place the marker in + // the queue so that it can be completed asynchronously. + asyncSendQueue.addLast(new SendCompletion(envelope, listener)); } - } - throw jmsEx; + @Override + public void onPendingFailure(Throwable cause) { + // Provider has rejected the send request so we will throw the + // exception that is to follow so no completion will be needed. + } + }); + } else { + transactionContext.send(connection, envelope, null); } } finally { sendLock.unlock(); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e676248c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java index d076000..f9f5c17 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java @@ -24,6 +24,7 @@ import org.apache.qpid.jms.meta.JmsResourceId; import org.apache.qpid.jms.meta.JmsTransactionId; import org.apache.qpid.jms.provider.Provider; import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; +import org.apache.qpid.jms.provider.ProviderSynchronization; /** * A Transaction Context is used to track and manage the state of a @@ -55,10 +56,12 @@ public interface JmsTransactionContext { * the connection that will be do the send of the message * @param envelope * the envelope that contains the message to be sent. + * @param outcome + * Synchronization used to set state prior to completion of the send call. * * @throws JMSException if an error occurs during the send. */ - void send(JmsConnection connection, JmsOutboundMessageDispatch envelope) throws JMSException; + void send(JmsConnection connection, JmsOutboundMessageDispatch envelope, ProviderSynchronization outcome) throws JMSException; /** * @return if the currently transaction has been marked as being in an unknown state. http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e676248c/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java index 99b47dd..f117fc3 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java @@ -83,16 +83,20 @@ import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionM import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher; import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher; import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher; +import org.apache.qpid.jms.util.QpidJMSTestRunner; +import org.apache.qpid.jms.util.Repeat; import org.apache.qpid.jms.util.StopWatch; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.UnsignedInteger; import org.junit.Test; +import org.junit.runner.RunWith; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@RunWith(QpidJMSTestRunner.class) public class FailoverIntegrationTest extends QpidJmsTestCase { private static final Logger LOG = LoggerFactory.getLogger(FailoverIntegrationTest.class); @@ -2648,6 +2652,70 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { } } + @Repeat(repetitions = 100) + @Test(timeout = 20000) + public void testFailoverDoesNotFailPendingAsyncCompletionSend() throws Exception { + try (TestAmqpPeer originalPeer = new TestAmqpPeer(); + TestAmqpPeer finalPeer = new TestAmqpPeer();) { + + // Create a peer to connect to, then one to reconnect to + final String originalURI = createPeerURI(originalPeer); + final String finalURI = createPeerURI(finalPeer); + + LOG.info("Original peer is at: {}", originalURI); + LOG.info("Final peer is at: {}", finalURI); + + originalPeer.expectSaslAnonymous(); + originalPeer.expectOpen(); + originalPeer.expectBegin(); + originalPeer.expectBegin(); + // Ensure our send blocks in the provider waiting for credit so that on failover + // the message will actually get sent from the Failover bits once we grant some + // credit for the recovered sender. + originalPeer.expectSenderAttachWithoutGrantingCredit(); + originalPeer.dropAfterLastHandler(10); // Wait for sender to get into wait state + + // --- Post Failover Expectations of sender --- // + finalPeer.expectSaslAnonymous(); + finalPeer.expectOpen(); + finalPeer.expectBegin(); + finalPeer.expectBegin(); + finalPeer.expectSenderAttach(); + finalPeer.expectTransfer(new TransferPayloadCompositeMatcher()); + finalPeer.expectClose(); + + final JmsConnection connection = establishAnonymousConnecton("failover.initialReconnectDelay=25", originalPeer, finalPeer); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + + MessageProducer producer = session.createProducer(queue); + + // Create and transfer a new message + String text = "myMessage"; + + TextMessage message = session.createTextMessage(text); + TestJmsCompletionListener listener = new TestJmsCompletionListener(); + + try { + producer.send(message, listener); + } catch (JMSException jmsEx) { + fail("Should not have failed the async completion send."); + } + + // This should fire after reconnect without an error, if it fires with an error at + // any time then something is wrong. + assertTrue("Did not get async callback", listener.awaitCompletion(5, TimeUnit.SECONDS)); + assertNull("Completion should not have been on error", listener.exception); + assertNotNull(listener.message); + assertTrue(listener.message instanceof TextMessage); + + connection.close(); + + finalPeer.waitForAllHandlersToComplete(1000); + } + } + private JmsConnection establishAnonymousConnecton(TestAmqpPeer... peers) throws JMSException { return establishAnonymousConnecton(null, null, peers); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org