This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git


The following commit(s) were added to refs/heads/master by this push:
     new 16988f3  QPIDJMS-457 Ensure that on send failure the message state is 
reset
16988f3 is described below

commit 16988f3307588fbdfb73558bf469ad0128997190
Author: Timothy Bish <[email protected]>
AuthorDate: Mon May 13 14:21:22 2019 -0400

    QPIDJMS-457 Ensure that on send failure the message state is reset
    
    On sync send failure ensure that message state resets to writable so
    that the message can be resent if desired.
---
 .../main/java/org/apache/qpid/jms/JmsSession.java  |  8 ++-
 .../jms/integration/ProducerIntegrationTest.java   | 59 ++++++++++++++++++++++
 2 files changed, 66 insertions(+), 1 deletion(-)

diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index ea70544..99f1392 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -813,6 +813,9 @@ public class JmsSession implements AutoCloseable, Session, 
QueueSession, TopicSe
 
     private void send(JmsMessageProducer producer, JmsDestination destination, 
Message original, int deliveryMode, int priority, long timeToLive, boolean 
disableMsgId, boolean disableTimestamp, long deliveryDelay, CompletionListener 
listener) throws JMSException {
         sendLock.lock();
+
+        JmsMessage outbound = null;
+
         try {
             original.setJMSDeliveryMode(deliveryMode);
             original.setJMSPriority(priority);
@@ -843,7 +846,6 @@ public class JmsSession implements AutoCloseable, Session, 
QueueSession, TopicSe
                 messageId = 
producer.getMessageIDBuilder().createMessageID(producer.getProducerId().toString(),
 messageSequence);
             }
 
-            JmsMessage outbound = null;
             if (isJmsMessage) {
                 outbound = (JmsMessage) original;
             } else {
@@ -929,6 +931,10 @@ public class JmsSession implements AutoCloseable, Session, 
QueueSession, TopicSe
             } else {
                 transactionContext.send(connection, envelope, null);
             }
+        } catch (JMSException jmsEx) {
+            // Ensure that on failure case the message is returned to usable 
state for another send attempt.
+            outbound.onSendComplete();
+            throw jmsEx;
         } finally {
             sendLock.unlock();
         }
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 4bebb03..5207caf 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -46,6 +46,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.BytesMessage;
@@ -3119,4 +3120,62 @@ public class ProducerIntegrationTest extends 
QpidJmsTestCase {
             completed.countDown();
         }
     }
+
+    @Test(timeout = 20000)
+    public void 
testFailedSendToOfflineConnectionMessageCanBeResentToNewConnection() throws 
Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            final AtomicBoolean exceptionListenerFired = new AtomicBoolean();
+            final String text = "my-message-body-text";
+
+            //----- Initial connection expectations and failure instructions
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(originalPeer);
+            connection.setExceptionListener(new ExceptionListener() {
+                @Override
+                public void onException(JMSException exception) {
+                    LOG.trace("JMS ExceptionListener: ", exception);
+                    exceptionListenerFired.set(true);
+                }
+            });
+
+            originalPeer.expectBegin();
+            originalPeer.expectSenderAttach();
+            originalPeer.dropAfterLastHandler();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            TextMessage message = session.createTextMessage(text);
+            Queue queue = session.createQueue("myQueue");
+
+            // initial producer which will be sent to after connection fails
+            MessageProducer producer = session.createProducer(queue);
+
+            // Await connection drop and then send to trigger failure from 
closed connection.
+            assertFalse("The ExceptionListener should not have been alerted", 
exceptionListenerFired.get());
+
+            try {
+                producer.send(message);
+                fail("Should have failed to send to failed connection.");
+            } catch (JMSException jmsEx) {
+            }
+
+            // --- Post Reconnection Expectations of this test
+            connection = (JmsConnection) 
testFixture.establishConnecton(finalPeer);
+
+            finalPeer.expectBegin();
+            finalPeer.expectSenderAttach();
+            finalPeer.expectTransfer(new TransferPayloadCompositeMatcher(), 
nullValue(), new Accepted(), true);
+            finalPeer.expectClose();
+
+            // Reconnect to another peer and send the failed message again 
which should work
+            // without need to reset or otherwise account for past failure.
+            session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            queue = session.createQueue("myQueue");
+            producer = session.createProducer(queue);
+            producer.send(message);
+            connection.close();
+
+            finalPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to