This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch 1.x
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git


The following commit(s) were added to refs/heads/1.x by this push:
     new 408fec7f QPIDJMS-600 Ensure session and connection close await async 
sends
408fec7f is described below

commit 408fec7f3de80d6106e2dbe2ff3f688c8df363e6
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Apr 19 18:58:03 2024 -0400

    QPIDJMS-600 Ensure session and connection close await async sends
    
    Session and Connection close should be awaiting the outcome of async send
    completions before returning. This change allows them to await up to the
    close timeout value before moving on and failing any completions that are
    not completed after that point. Several tests added to cover this behavior.
    
    (cherry picked from commit 90eb60f59cb59b7b9ad8363ee8a843d6903b8e77)
---
 .../java/org/apache/qpid/jms/JmsConnection.java    |   8 +
 .../main/java/org/apache/qpid/jms/JmsSession.java  |  65 ++++++--
 .../integration/BytesMessageIntegrationTest.java   |   2 +-
 .../jms/integration/MapMessageIntegrationTest.java |   2 +-
 .../jms/integration/MessageIntegrationTest.java    |   2 +-
 .../integration/ObjectMessageIntegrationTest.java  |   1 +
 .../jms/integration/ProducerIntegrationTest.java   | 163 ++++++++++++++++++++-
 .../jms/integration/SessionIntegrationTest.java    |  80 ++++++++++
 .../integration/StreamMessageIntegrationTest.java  |   1 +
 .../integration/TextMessageIntegrationTest.java    |   1 +
 .../provider/failover/FailoverIntegrationTest.java |  63 ++++++++
 11 files changed, 365 insertions(+), 23 deletions(-)

diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 4fcdefdb..cb076a36 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -916,6 +916,14 @@ public class JmsConnection implements AutoCloseable, 
Connection, TopicConnection
         }
     }
 
+    ProviderFuture newProviderFuture() {
+        return newProviderFuture(null);
+    }
+
+    ProviderFuture newProviderFuture(ProviderSynchronization synchronization) {
+        return provider.newProviderFuture(synchronization);
+    }
+
     //----- Property setters and getters 
-------------------------------------//
 
     @Override
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 4907d169..d35e0419 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -126,6 +126,7 @@ public class JmsSession implements AutoCloseable, Session, 
QueueSession, TopicSe
     private final ReentrantLock sendLock = new ReentrantLock();
     private volatile ThreadPoolExecutor deliveryExecutor;
     private volatile ThreadPoolExecutor completionExcecutor;
+    private volatile ProviderFuture asyncSendsCompletion;
     private AtomicReference<Thread> deliveryThread = new 
AtomicReference<Thread>();
     private boolean deliveryThreadCheckEnabled = true;
     private AtomicReference<Thread> completionThread = new 
AtomicReference<Thread>();
@@ -351,6 +352,7 @@ public class JmsSession implements AutoCloseable, Session, 
QueueSession, TopicSe
                     for (JmsMessageProducer producer : new 
ArrayList<JmsMessageProducer>(this.producers.values())) {
                         producer.shutdown(cause);
                     }
+
                 } catch (JMSException jmsEx) {
                     shutdownError = jmsEx;
                 }
@@ -367,30 +369,52 @@ public class JmsSession implements AutoCloseable, 
Session, QueueSession, TopicSe
                     }
                 }
 
-                // Ensure that no asynchronous completion sends remain blocked 
after close.
+                try {
+                    if (getSessionMode() == Session.CLIENT_ACKNOWLEDGE) {
+                        acknowledge(ACK_TYPE.SESSION_SHUTDOWN);
+                    }
+                } catch (Exception e) {
+                    LOG.trace("Exception during session shutdown cleanup 
acknowledgement", e);
+                }
+
+                // Ensure that no asynchronous completion sends remain blocked 
after close but wait
+                // using the close timeout for the asynchronous sends to 
complete normally.
+                final ExecutorService completionExecutor = 
getCompletionExecutor();
+
                 synchronized (sessionInfo) {
+                    // Producers are now quiesced and we can await completion 
of asynchronous sends
+                    // that are still pending a result or timeout once we've 
done a quick check to
+                    // see if any are actually pending or have completed 
already.
+                    asyncSendsCompletion = connection.newProviderFuture();
+
+                    completionExecutor.execute(() -> {
+                        if (asyncSendQueue.isEmpty()) {
+                            asyncSendsCompletion.onSuccess();
+                        }
+                    });
+                }
+
+                try {
+                    asyncSendsCompletion.sync(connection.getCloseTimeout(), 
TimeUnit.MILLISECONDS);
+                } catch (Exception ex) {
+                    LOG.trace("Exception during wait for asynchronous sends to 
complete", ex);
+                } finally {
                     if (cause == null) {
                         cause = new JMSException("Session closed remotely 
before message transfer result was notified");
                     }
 
-                    getCompletionExecutor().execute(new 
FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
-                    getCompletionExecutor().shutdown();
+                    // as a last task we want to fail any stragglers in the 
asynchronous send queue and then
+                    // shutdown the queue to prevent any more submissions 
while the cleanup goes on.
+                    completionExecutor.execute(new 
FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
+                    completionExecutor.shutdown();
                 }
 
                 try {
-                    
getCompletionExecutor().awaitTermination(connection.getCloseTimeout(), 
TimeUnit.MILLISECONDS);
+                    
completionExecutor.awaitTermination(connection.getCloseTimeout(), 
TimeUnit.MILLISECONDS);
                 } catch (InterruptedException e) {
                     LOG.trace("Session close awaiting send completions was 
interrupted");
                 }
 
-                try {
-                    if (getSessionMode() == Session.CLIENT_ACKNOWLEDGE) {
-                        acknowledge(ACK_TYPE.SESSION_SHUTDOWN);
-                    }
-                } catch (Exception e) {
-                    LOG.trace("Exception during session shutdown cleanup 
acknowledgement", e);
-                }
-
                 if (shutdownError != null) {
                     throw shutdownError;
                 }
@@ -856,11 +880,12 @@ public class JmsSession implements AutoCloseable, 
Session, QueueSession, TopicSe
     }
 
     private void send(JmsMessageProducer producer, JmsDestination destination, 
Message original, int deliveryMode, int priority, long timeToLive, boolean 
disableMsgId, boolean disableTimestamp, long deliveryDelay, CompletionListener 
listener) throws JMSException {
-        sendLock.lock();
-
         JmsMessage outbound = null;
+        sendLock.lock();
 
         try {
+            checkClosed();
+
             original.setJMSDeliveryMode(deliveryMode);
             original.setJMSPriority(priority);
             original.setJMSRedelivered(false);
@@ -909,7 +934,7 @@ public class JmsSession implements AutoCloseable, Session, 
QueueSession, TopicSe
             }
 
             outbound.getFacade().setDeliveryTime(deliveryTime, hasDelay);
-            if(!isJmsMessage) {
+            if (!isJmsMessage) {
                 // If the original was a foreign message, we still need to 
update it too.
                 setForeignMessageDeliveryTime(original, deliveryTime);
             }
@@ -977,7 +1002,7 @@ public class JmsSession implements AutoCloseable, Session, 
QueueSession, TopicSe
             }
         } catch (JMSException jmsEx) {
             // Ensure that on failure case the message is returned to usable 
state for another send attempt.
-            if(outbound != null) {
+            if (outbound != null) {
                 outbound.onSendComplete();
             }
             throw jmsEx;
@@ -1511,6 +1536,10 @@ public class JmsSession implements AutoCloseable, 
Session, QueueSession, TopicSe
             if (producerId == null) {
                 asyncSendQueue.clear();
             }
+
+            if (closed.get() && asyncSendsCompletion != null && 
asyncSendQueue.isEmpty()) {
+                asyncSendsCompletion.onSuccess();
+            }
         }
     }
 
@@ -1577,6 +1606,10 @@ public class JmsSession implements AutoCloseable, 
Session, QueueSession, TopicSe
                         }
                     }
                 }
+
+                if (closed.get() && asyncSendsCompletion != null && 
asyncSendQueue.isEmpty()) {
+                    asyncSendsCompletion.onSuccess();
+                }
             } catch (Exception ex) {
                 LOG.error("Async completion task encountered unexpected 
failure", ex);
             }
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
index 639c5d8f..298d97e3 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
@@ -581,7 +581,7 @@ public class BytesMessageIntegrationTest extends 
QpidJmsTestCase {
     @Timeout(20)
     public void testAsyncCompletionSendMarksBytesMessageReadOnly() throws 
Exception {
         try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer);
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer, "jms.closeTimeout=50");
             connection.setSendTimeout(15000);
 
             testPeer.expectBegin();
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
index 954938f0..60b9c81c 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
@@ -440,7 +440,7 @@ public class MapMessageIntegrationTest extends 
QpidJmsTestCase {
     @Timeout(20)
     public void testAsyncCompletionSendMarksMapMessageReadOnly() throws 
Exception {
         try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer);
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer, "jms.closeTimeout=50");
             connection.setSendTimeout(15000);
 
             testPeer.expectBegin();
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
index 60f3020d..19bcf698 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
@@ -2231,7 +2231,7 @@ public class MessageIntegrationTest extends 
QpidJmsTestCase
     @Timeout(20)
     public void testAsyncCompletionSendMarksMessageReadOnly() throws Exception 
{
         try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer);
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer, "jms.closeTimeout=50");
             connection.setSendTimeout(15000);
 
             testPeer.expectBegin();
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java
index ebe2dcec..5b6911ef 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java
@@ -660,6 +660,7 @@ public class ObjectMessageIntegrationTest extends 
QpidJmsTestCase {
         try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
             JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer);
             connection.setSendTimeout(15000);
+            connection.setCloseTimeout(10);
 
             testPeer.expectBegin();
 
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index e01c6a4f..4dd6b01d 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -2322,7 +2322,7 @@ public class ProducerIntegrationTest extends 
QpidJmsTestCase {
     @Timeout(20)
     public void testAsyncCompletionGetsNotifiedWhenSessionClosed() throws 
Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer);
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer, "jms.closeTimeout=100");
 
             testPeer.expectBegin();
             testPeer.expectSenderAttach();
@@ -2343,6 +2343,8 @@ public class ProducerIntegrationTest extends 
QpidJmsTestCase {
 
             producer.send(message, listener);
 
+            assertFalse(listener.hasCompleted()); // Close should complete it 
as failed on timeout
+
             session.close();
 
             assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS), "Did not 
get async callback");
@@ -2356,11 +2358,51 @@ public class ProducerIntegrationTest extends 
QpidJmsTestCase {
         }
     }
 
+    @Test
+    @Timeout(20)
+    public void 
testAsyncCompletionGetsNotifiedWhenSessionClosedAndWaitForCompletion() throws 
Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer, "jms.closeTimeout=1000");
+
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            MessageProducer producer = session.createProducer(queue);
+
+            // Create and transfer a new message
+            String text = "myMessage";
+            testPeer.expectTransfer(new TransferPayloadCompositeMatcher(), 
nullValue(), false, true, new Accepted(), true, 0, 100);
+            testPeer.expectEnd();
+            testPeer.expectClose();
+
+            TextMessage message = session.createTextMessage(text);
+            TestJmsCompletionListener listener = new 
TestJmsCompletionListener();
+
+            producer.send(message, listener);
+
+            assertFalse(listener.hasCompleted()); // Close should complete it 
as accepted after the delay
+
+            session.close();
+
+            assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS), "Did not 
get async callback");
+            assertNull(listener.exception);
+            assertNotNull(listener.message);
+            assertTrue(listener.message instanceof TextMessage);
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
     @Test
     @Timeout(20)
     public void testAsyncCompletionGetsNotifiedWhenConnectionClosed() throws 
Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer);
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer, "jms.closeTimeout=150");
 
             testPeer.expectBegin();
             testPeer.expectSenderAttach();
@@ -2380,6 +2422,8 @@ public class ProducerIntegrationTest extends 
QpidJmsTestCase {
 
             producer.send(message, listener);
 
+            assertFalse(listener.hasCompleted());
+
             connection.close();
 
             assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS), "Did not 
get async callback");
@@ -2391,6 +2435,43 @@ public class ProducerIntegrationTest extends 
QpidJmsTestCase {
         }
     }
 
+    @Test
+    @Timeout(20)
+    public void 
testAsyncCompletionAllowedToCompleteNormallyWhenConnectionClosed() throws 
Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer, "jms.closeTimeout=1000");
+
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            MessageProducer producer = session.createProducer(queue);
+
+            // Create and transfer a new message
+            String text = "myMessage";
+            testPeer.expectTransfer(new TransferPayloadCompositeMatcher(), 
nullValue(), false, true, new Accepted(), true, 0, 100);
+            testPeer.expectClose();
+
+            TextMessage message = session.createTextMessage(text);
+            TestJmsCompletionListener listener = new 
TestJmsCompletionListener();
+
+            producer.send(message, listener);
+
+            assertFalse(listener.hasCompleted());
+
+            connection.close();
+
+            assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS), "Did not 
get async callback");
+            assertNull(listener.exception);
+            assertNotNull(listener.message);
+            assertTrue(listener.message instanceof TextMessage);
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
     @Test
     @Timeout(20)
     public void testAsyncCompletionResetsBytesMessage() throws Exception {
@@ -2856,7 +2937,7 @@ public class ProducerIntegrationTest extends 
QpidJmsTestCase {
     @Timeout(20)
     public void 
testRemotelyCloseOneProducerDoesNotCompleteAsyncSendFromAnotherProducer() 
throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer);
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer, "jms.closeTimeout=150");
 
             testPeer.expectBegin();
             testPeer.expectSenderAttach();
@@ -2895,7 +2976,7 @@ public class ProducerIntegrationTest extends 
QpidJmsTestCase {
 
             assertFalse(listener.awaitCompletion(10, TimeUnit.MILLISECONDS), 
"Should not get async callback");
 
-            // Closing the session should complete the send with an exception
+            // Closing the session should complete the send with an exception 
after timeout
             testPeer.expectEnd();
             session.close();
 
@@ -3044,6 +3125,10 @@ public class ProducerIntegrationTest extends 
QpidJmsTestCase {
             this.completed = new CountDownLatch(expected);
         }
 
+        public boolean hasCompleted() {
+            return completed.getCount() == 0;
+        }
+
         public boolean awaitCompletion(long timeout, TimeUnit units) throws 
InterruptedException {
             return completed.await(timeout, units);
         }
@@ -3239,4 +3324,74 @@ public class ProducerIntegrationTest extends 
QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
+
+    @Test
+    @Timeout(20)
+    public void testRemotelyEndConnectionCompletesAsyncSends() throws 
Exception {
+        final String BREAD_CRUMB = "ErrorMessage";
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final CountDownLatch connectionClosed = new CountDownLatch(1);
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer);
+            connection.addConnectionListener(new 
JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionFailure(Throwable exception) {
+                    connectionClosed.countDown();
+                }
+            });
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            // Create a producer, then remotely end the session afterwards.
+            testPeer.expectSenderAttach();
+
+            Queue queue = session.createQueue("myQueue");
+            final MessageProducer producer = session.createProducer(queue);
+
+            final int MSG_COUNT = 3;
+
+            for (int i = 0; i < MSG_COUNT; ++i) {
+                testPeer.expectTransferButDoNotRespond(new 
TransferPayloadCompositeMatcher());
+            }
+
+            TestJmsCompletionListener listener = new 
TestJmsCompletionListener(MSG_COUNT);
+            try {
+                for (int i = 0; i < MSG_COUNT; ++i) {
+                    Message message = session.createTextMessage("content");
+                    producer.send(message, listener);
+                }
+            } catch (JMSException e) {
+                LOG.warn("Caught unexpected error: {}", e.getMessage());
+                fail("No expected exception for this send.");
+            }
+
+            testPeer.waitForAllHandlersToComplete(2000);
+            testPeer.expectSenderAttach();
+            testPeer.remotelyCloseConnection(true, AmqpError.RESOURCE_DELETED, 
BREAD_CRUMB, 50);
+
+            session.createProducer(queue);
+
+            // Verify the session gets marked closed
+            assertTrue(connectionClosed.await(5, TimeUnit.SECONDS), "Session 
closed callback didn't trigger");
+
+            try {
+                producer.getDeliveryMode();
+                fail("Expected ISE to be thrown due to being closed");
+            } catch (IllegalStateException jmsise) {
+                String errorMessage = jmsise.getCause().getMessage();
+                
assertTrue(errorMessage.contains(AmqpError.RESOURCE_DELETED.toString()));
+                assertTrue(errorMessage.contains(BREAD_CRUMB));
+            }
+
+            assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS));
+            assertEquals(MSG_COUNT, listener.errorCount); // All sends should 
have been failed
+
+            // Try closing it explicitly, should effectively no-op in client.
+            // The test peer will throw during close if it sends anything.
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
 }
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 98a12eca..c8d98145 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -2333,6 +2333,10 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
             completed = new CountDownLatch(expected);
         }
 
+        public boolean hasCompleted() {
+            return completed.getCount() == 0;
+        }
+
         public boolean awaitCompletion(long timeout, TimeUnit units) throws 
InterruptedException {
             return completed.await(timeout, units);
         }
@@ -2836,6 +2840,82 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
         }
     }
 
+    @Test
+    @Timeout(20)
+    public void testRemotelyEndSessionCompletesAsyncSends() throws Exception {
+        final String BREAD_CRUMB = "ErrorMessage";
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final CountDownLatch sessionClosed = new CountDownLatch(1);
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer);
+            connection.addConnectionListener(new 
JmsDefaultConnectionListener() {
+                @Override
+                public void onSessionClosed(Session session, Throwable 
exception) {
+                    sessionClosed.countDown();
+                }
+            });
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            // Create a producer, then remotely end the session afterwards.
+            testPeer.expectSenderAttach();
+
+            Queue queue = session.createQueue("myQueue");
+            final MessageProducer producer = session.createProducer(queue);
+
+            final int MSG_COUNT = 3;
+
+            for (int i = 0; i < MSG_COUNT; ++i) {
+                testPeer.expectTransferButDoNotRespond(new 
TransferPayloadCompositeMatcher());
+            }
+
+            TestJmsCompletionListener listener = new 
TestJmsCompletionListener(MSG_COUNT);
+            try {
+                for (int i = 0; i < MSG_COUNT; ++i) {
+                    Message message = session.createTextMessage("content");
+                    producer.send(message, listener);
+                }
+            } catch (JMSException e) {
+                LOG.warn("Caught unexpected error: {}", e.getMessage());
+                fail("No expected exception for this send.");
+            }
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            assertFalse(listener.hasCompleted());
+
+            testPeer.expectSenderAttach();
+            testPeer.remotelyEndLastOpenedSession(true, 50, 
AmqpError.RESOURCE_DELETED, BREAD_CRUMB);
+
+            session.createProducer(queue);
+
+            // Verify the session gets marked closed
+            assertTrue(sessionClosed.await(5, TimeUnit.SECONDS), "Session 
closed callback didn't trigger");
+
+            try {
+                producer.getDeliveryMode();
+                fail("Expected ISE to be thrown due to being closed");
+            } catch (IllegalStateException jmsise) {
+                String errorMessage = jmsise.getCause().getMessage();
+                
assertTrue(errorMessage.contains(AmqpError.RESOURCE_DELETED.toString()));
+                assertTrue(errorMessage.contains(BREAD_CRUMB));
+            }
+
+            assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS));
+            assertEquals(MSG_COUNT, listener.errorCount); // All sends should 
have been failed
+
+            // Try closing it explicitly, should effectively no-op in client.
+            // The test peer will throw during close if it sends anything.
+            session.close();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
     private boolean verifyConsumerClosure(final String BREAD_CRUMB, final 
MessageConsumer consumer) throws Exception {
         return Wait.waitFor(new Wait.Condition() {
             @Override
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
index b7906e48..3b09dfc7 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
@@ -439,6 +439,7 @@ public class StreamMessageIntegrationTest extends 
QpidJmsTestCase {
         try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
             JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer);
             connection.setSendTimeout(15000);
+            connection.setSendTimeout(15);
 
             testPeer.expectBegin();
 
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java
index 4653447c..1d907c6b 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java
@@ -447,6 +447,7 @@ public class TextMessageIntegrationTest extends 
QpidJmsTestCase {
         try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
             JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer);
             connection.setSendTimeout(15000);
+            connection.setCloseTimeout(15);
 
             testPeer.expectBegin();
 
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 9ae890f7..a6e350a8 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -4558,6 +4558,69 @@ public class FailoverIntegrationTest extends 
QpidJmsTestCase {
         }
     }
 
+    @Test
+    @Timeout(20)
+    public void testFailoverDoesFailPendingAsyncCompletionSend() throws 
Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            // Create a peer to connect to, then one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Final peer is at: {}", finalURI);
+
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
+            originalPeer.expectBegin();
+            originalPeer.expectBegin();
+            originalPeer.expectSenderAttach();
+            originalPeer.expectTransferButDoNotRespond(new 
TransferPayloadCompositeMatcher());
+            originalPeer.dropAfterLastHandler(15);  // Wait for sender to get 
into wait state
+
+            // --- Post Failover Expectations of sender --- //
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectSenderAttach();
+
+            final JmsConnection connection = 
establishAnonymousConnecton("failover.initialReconnectDelay=25", originalPeer, 
finalPeer);
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            MessageProducer producer = session.createProducer(queue);
+
+            // Create and transfer a new message
+            String text = "myMessage";
+
+            TextMessage message = session.createTextMessage(text);
+            TestJmsCompletionListener listener = new 
TestJmsCompletionListener();
+
+            try {
+                producer.send(message, listener);
+            } catch (JMSException jmsEx) {
+                fail("Should not have failed the async completion send.");
+            }
+
+            // This should fire after reconnect without an error, if it fires 
with an error at
+            // any time then something is wrong.
+            assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS), "Did not 
get async callback");
+            assertNotNull(listener.exception, "Completion should have been due 
to error");
+            assertNotNull(listener.message);
+            assertTrue(listener.message instanceof TextMessage);
+
+            finalPeer.waitForAllHandlersToComplete(5000);
+            finalPeer.expectClose();
+
+            connection.close();
+
+            finalPeer.waitForAllHandlersToComplete(5000);
+        }
+    }
+
     @Test
     @Timeout(20)
     public void testFailoverHandlesAnonymousFallbackWaitingForClose() throws 
Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to