Repository: qpid-jms
Updated Branches:
  refs/heads/master 7226605a7 -> e676248c7


QPIDJMS-394 Fix failover wrongly signalling async completion

Ensure that an inflight async completion that is held in the
Failover provider is not signaled as failed by the session
during connection recovery.


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/e676248c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/e676248c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/e676248c

Branch: refs/heads/master
Commit: e676248c74e9747031e5a89a640442bb51ffcb28
Parents: 7226605
Author: Timothy Bish <tabish...@gmail.com>
Authored: Tue Jun 12 15:24:32 2018 -0400
Committer: Timothy Bish <tabish...@gmail.com>
Committed: Tue Jun 12 15:24:32 2018 -0400

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsConnection.java |  4 --
 .../qpid/jms/JmsLocalTransactionContext.java    | 12 +++-
 .../qpid/jms/JmsNoTxTransactionContext.java     |  5 +-
 .../java/org/apache/qpid/jms/JmsSession.java    | 31 +++++----
 .../apache/qpid/jms/JmsTransactionContext.java  |  5 +-
 .../failover/FailoverIntegrationTest.java       | 68 ++++++++++++++++++++
 6 files changed, 101 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e676248c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 9f828ab..338ad5d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -760,10 +760,6 @@ public class JmsConnection implements AutoCloseable, 
Connection, TopicConnection
         }
     }
 
-    void send(JmsOutboundMessageDispatch envelope) throws JMSException {
-        send(envelope, null);
-    }
-
     void send(JmsOutboundMessageDispatch envelope, ProviderSynchronization 
synchronization) throws JMSException {
         checkClosedOrFailed();
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e676248c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
index bf13888..c60c5e6 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
@@ -56,10 +56,14 @@ public class JmsLocalTransactionContext implements 
JmsTransactionContext {
     }
 
     @Override
-    public void send(JmsConnection connection, final 
JmsOutboundMessageDispatch envelope) throws JMSException {
+    public void send(JmsConnection connection, final 
JmsOutboundMessageDispatch envelope, ProviderSynchronization outcome) throws 
JMSException {
         lock.readLock().lock();
         try {
             if (isInDoubt()) {
+                // Need to signal that the request is going to pass before 
completing
+                if (outcome != null) {
+                    outcome.onPendingSuccess();
+                }
                 // Ensure that asynchronous completions get signaled while TX 
is in doubt
                 if (envelope.isCompletionRequired()) {
                     connection.onCompletedMessageSend(envelope);
@@ -74,12 +78,18 @@ public class JmsLocalTransactionContext implements 
JmsTransactionContext {
                 public void onPendingSuccess() {
                     LOG.trace("TX:{} has performed a send.", 
getTransactionId());
                     participants.put(envelope.getProducerId(), 
envelope.getProducerId());
+                    if (outcome != null) {
+                        outcome.onPendingSuccess();
+                    }
                 }
 
                 @Override
                 public void onPendingFailure(Throwable cause) {
                     LOG.trace("TX:{} has a failed send.", getTransactionId());
                     participants.put(envelope.getProducerId(), 
envelope.getProducerId());
+                    if (outcome != null) {
+                        outcome.onPendingFailure(cause);
+                    }
                 }
             });
         } finally {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e676248c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
index bb3e421..360988f 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
@@ -24,6 +24,7 @@ import org.apache.qpid.jms.meta.JmsResourceId;
 import org.apache.qpid.jms.meta.JmsTransactionId;
 import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+import org.apache.qpid.jms.provider.ProviderSynchronization;
 
 /**
  * Used in non-transacted JMS Sessions to throw proper errors indicating
@@ -32,8 +33,8 @@ import 
org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 public class JmsNoTxTransactionContext implements JmsTransactionContext {
 
     @Override
-    public void send(JmsConnection connection, JmsOutboundMessageDispatch 
envelope) throws JMSException {
-        connection.send(envelope);
+    public void send(JmsConnection connection, JmsOutboundMessageDispatch 
envelope, ProviderSynchronization outcome) throws JMSException {
+        connection.send(envelope, outcome);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e676248c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index b8c88f6..9711728 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -92,6 +92,7 @@ import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
 import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderSynchronization;
 import org.apache.qpid.jms.selector.SelectorParser;
 import org.apache.qpid.jms.selector.filter.FilterException;
 import org.apache.qpid.jms.util.NoOpExecutor;
@@ -889,26 +890,24 @@ public class JmsSession implements AutoCloseable, 
Session, QueueSession, TopicSe
                 outbound.onSendComplete();
             }
 
-            SendCompletion completion = null;
             if (envelope.isCompletionRequired()) {
-                completion = new SendCompletion(envelope, listener);
-                asyncSendQueue.addLast(completion);
-            }
+                transactionContext.send(connection, envelope, new 
ProviderSynchronization() {
 
-            try {
-                transactionContext.send(connection, envelope);
-            } catch (JMSException jmsEx) {
-                // If the synchronous portion of the send fails the completion 
be
-                // notified but might depending on the circumstances of the 
failures,
-                // remove it from the queue and check if is is already 
completed.
-                if (completion != null) {
-                    asyncSendQueue.remove(completion);
-                    if (completion.hasCompleted()) {
-                        return;
+                    @Override
+                    public void onPendingSuccess() {
+                        // Provider accepted the send request so new we place 
the marker in
+                        // the queue so that it can be completed 
asynchronously.
+                        asyncSendQueue.addLast(new SendCompletion(envelope, 
listener));
                     }
-                }
 
-                throw jmsEx;
+                    @Override
+                    public void onPendingFailure(Throwable cause) {
+                        // Provider has rejected the send request so we will 
throw the
+                        // exception that is to follow so no completion will 
be needed.
+                    }
+                });
+            } else {
+                transactionContext.send(connection, envelope, null);
             }
         } finally {
             sendLock.unlock();

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e676248c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java
index d076000..f9f5c17 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java
@@ -24,6 +24,7 @@ import org.apache.qpid.jms.meta.JmsResourceId;
 import org.apache.qpid.jms.meta.JmsTransactionId;
 import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+import org.apache.qpid.jms.provider.ProviderSynchronization;
 
 /**
  * A Transaction Context is used to track and manage the state of a
@@ -55,10 +56,12 @@ public interface JmsTransactionContext {
      *        the connection that will be do the send of the message
      * @param envelope
      *        the envelope that contains the message to be sent.
+     * @param outcome
+     *               Synchronization used to set state prior to completion of 
the send call.
      *
      * @throws JMSException if an error occurs during the send.
      */
-    void send(JmsConnection connection, JmsOutboundMessageDispatch envelope) 
throws JMSException;
+    void send(JmsConnection connection, JmsOutboundMessageDispatch envelope, 
ProviderSynchronization outcome) throws JMSException;
 
     /**
      * @return if the currently transaction has been marked as being in an 
unknown state.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e676248c/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 99b47dd..f117fc3 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -83,16 +83,20 @@ import 
org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionM
 import 
org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher;
 import 
org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
 import 
org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
+import org.apache.qpid.jms.util.QpidJMSTestRunner;
+import org.apache.qpid.jms.util.Repeat;
 import org.apache.qpid.jms.util.StopWatch;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@RunWith(QpidJMSTestRunner.class)
 public class FailoverIntegrationTest extends QpidJmsTestCase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FailoverIntegrationTest.class);
@@ -2648,6 +2652,70 @@ public class FailoverIntegrationTest extends 
QpidJmsTestCase {
         }
     }
 
+    @Repeat(repetitions = 100)
+    @Test(timeout = 20000)
+    public void testFailoverDoesNotFailPendingAsyncCompletionSend() throws 
Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            // Create a peer to connect to, then one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Final peer is at: {}", finalURI);
+
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
+            originalPeer.expectBegin();
+            originalPeer.expectBegin();
+            // Ensure our send blocks in the provider waiting for credit so 
that on failover
+            // the message will actually get sent from the Failover bits once 
we grant some
+            // credit for the recovered sender.
+            originalPeer.expectSenderAttachWithoutGrantingCredit();
+            originalPeer.dropAfterLastHandler(10);  // Wait for sender to get 
into wait state
+
+            // --- Post Failover Expectations of sender --- //
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectSenderAttach();
+            finalPeer.expectTransfer(new TransferPayloadCompositeMatcher());
+            finalPeer.expectClose();
+
+            final JmsConnection connection = 
establishAnonymousConnecton("failover.initialReconnectDelay=25", originalPeer, 
finalPeer);
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            MessageProducer producer = session.createProducer(queue);
+
+            // Create and transfer a new message
+            String text = "myMessage";
+
+            TextMessage message = session.createTextMessage(text);
+            TestJmsCompletionListener listener = new 
TestJmsCompletionListener();
+
+            try {
+                producer.send(message, listener);
+            } catch (JMSException jmsEx) {
+                fail("Should not have failed the async completion send.");
+            }
+
+            // This should fire after reconnect without an error, if it fires 
with an error at
+            // any time then something is wrong.
+            assertTrue("Did not get async callback", 
listener.awaitCompletion(5, TimeUnit.SECONDS));
+            assertNull("Completion should not have been on error", 
listener.exception);
+            assertNotNull(listener.message);
+            assertTrue(listener.message instanceof TextMessage);
+
+            connection.close();
+
+            finalPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
     private JmsConnection establishAnonymousConnecton(TestAmqpPeer... peers) 
throws JMSException {
         return establishAnonymousConnecton(null, null, peers);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to