This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch 1.x
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git
The following commit(s) were added to refs/heads/1.x by this push:
new 408fec7f QPIDJMS-600 Ensure session and connection close await async
sends
408fec7f is described below
commit 408fec7f3de80d6106e2dbe2ff3f688c8df363e6
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Apr 19 18:58:03 2024 -0400
QPIDJMS-600 Ensure session and connection close await async sends
Session and Connection close should be awaiting the outcome of async send
completions before returning. This change allows them to await up to the
close timeout value before moving on and failing any completions that are
not completed after that point. Several tests added to cover this behavior.
(cherry picked from commit 90eb60f59cb59b7b9ad8363ee8a843d6903b8e77)
---
.../java/org/apache/qpid/jms/JmsConnection.java | 8 +
.../main/java/org/apache/qpid/jms/JmsSession.java | 65 ++++++--
.../integration/BytesMessageIntegrationTest.java | 2 +-
.../jms/integration/MapMessageIntegrationTest.java | 2 +-
.../jms/integration/MessageIntegrationTest.java | 2 +-
.../integration/ObjectMessageIntegrationTest.java | 1 +
.../jms/integration/ProducerIntegrationTest.java | 163 ++++++++++++++++++++-
.../jms/integration/SessionIntegrationTest.java | 80 ++++++++++
.../integration/StreamMessageIntegrationTest.java | 1 +
.../integration/TextMessageIntegrationTest.java | 1 +
.../provider/failover/FailoverIntegrationTest.java | 63 ++++++++
11 files changed, 365 insertions(+), 23 deletions(-)
diff --git
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 4fcdefdb..cb076a36 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -916,6 +916,14 @@ public class JmsConnection implements AutoCloseable,
Connection, TopicConnection
}
}
+ ProviderFuture newProviderFuture() {
+ return newProviderFuture(null);
+ }
+
+ ProviderFuture newProviderFuture(ProviderSynchronization synchronization) {
+ return provider.newProviderFuture(synchronization);
+ }
+
//----- Property setters and getters
-------------------------------------//
@Override
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 4907d169..d35e0419 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -126,6 +126,7 @@ public class JmsSession implements AutoCloseable, Session,
QueueSession, TopicSe
private final ReentrantLock sendLock = new ReentrantLock();
private volatile ThreadPoolExecutor deliveryExecutor;
private volatile ThreadPoolExecutor completionExcecutor;
+ private volatile ProviderFuture asyncSendsCompletion;
private AtomicReference<Thread> deliveryThread = new
AtomicReference<Thread>();
private boolean deliveryThreadCheckEnabled = true;
private AtomicReference<Thread> completionThread = new
AtomicReference<Thread>();
@@ -351,6 +352,7 @@ public class JmsSession implements AutoCloseable, Session,
QueueSession, TopicSe
for (JmsMessageProducer producer : new
ArrayList<JmsMessageProducer>(this.producers.values())) {
producer.shutdown(cause);
}
+
} catch (JMSException jmsEx) {
shutdownError = jmsEx;
}
@@ -367,30 +369,52 @@ public class JmsSession implements AutoCloseable,
Session, QueueSession, TopicSe
}
}
- // Ensure that no asynchronous completion sends remain blocked
after close.
+ try {
+ if (getSessionMode() == Session.CLIENT_ACKNOWLEDGE) {
+ acknowledge(ACK_TYPE.SESSION_SHUTDOWN);
+ }
+ } catch (Exception e) {
+ LOG.trace("Exception during session shutdown cleanup
acknowledgement", e);
+ }
+
+ // Ensure that no asynchronous completion sends remain blocked
after close but wait
+ // using the close timeout for the asynchronous sends to
complete normally.
+ final ExecutorService completionExecutor =
getCompletionExecutor();
+
synchronized (sessionInfo) {
+ // Producers are now quiesced and we can await completion
of asynchronous sends
+ // that are still pending a result or timeout once we've
done a quick check to
+ // see if any are actually pending or have completed
already.
+ asyncSendsCompletion = connection.newProviderFuture();
+
+ completionExecutor.execute(() -> {
+ if (asyncSendQueue.isEmpty()) {
+ asyncSendsCompletion.onSuccess();
+ }
+ });
+ }
+
+ try {
+ asyncSendsCompletion.sync(connection.getCloseTimeout(),
TimeUnit.MILLISECONDS);
+ } catch (Exception ex) {
+ LOG.trace("Exception during wait for asynchronous sends to
complete", ex);
+ } finally {
if (cause == null) {
cause = new JMSException("Session closed remotely
before message transfer result was notified");
}
- getCompletionExecutor().execute(new
FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
- getCompletionExecutor().shutdown();
+ // as a last task we want to fail any stragglers in the
asynchronous send queue and then
+ // shutdown the queue to prevent any more submissions
while the cleanup goes on.
+ completionExecutor.execute(new
FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
+ completionExecutor.shutdown();
}
try {
-
getCompletionExecutor().awaitTermination(connection.getCloseTimeout(),
TimeUnit.MILLISECONDS);
+
completionExecutor.awaitTermination(connection.getCloseTimeout(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.trace("Session close awaiting send completions was
interrupted");
}
- try {
- if (getSessionMode() == Session.CLIENT_ACKNOWLEDGE) {
- acknowledge(ACK_TYPE.SESSION_SHUTDOWN);
- }
- } catch (Exception e) {
- LOG.trace("Exception during session shutdown cleanup
acknowledgement", e);
- }
-
if (shutdownError != null) {
throw shutdownError;
}
@@ -856,11 +880,12 @@ public class JmsSession implements AutoCloseable,
Session, QueueSession, TopicSe
}
private void send(JmsMessageProducer producer, JmsDestination destination,
Message original, int deliveryMode, int priority, long timeToLive, boolean
disableMsgId, boolean disableTimestamp, long deliveryDelay, CompletionListener
listener) throws JMSException {
- sendLock.lock();
-
JmsMessage outbound = null;
+ sendLock.lock();
try {
+ checkClosed();
+
original.setJMSDeliveryMode(deliveryMode);
original.setJMSPriority(priority);
original.setJMSRedelivered(false);
@@ -909,7 +934,7 @@ public class JmsSession implements AutoCloseable, Session,
QueueSession, TopicSe
}
outbound.getFacade().setDeliveryTime(deliveryTime, hasDelay);
- if(!isJmsMessage) {
+ if (!isJmsMessage) {
// If the original was a foreign message, we still need to
update it too.
setForeignMessageDeliveryTime(original, deliveryTime);
}
@@ -977,7 +1002,7 @@ public class JmsSession implements AutoCloseable, Session,
QueueSession, TopicSe
}
} catch (JMSException jmsEx) {
// Ensure that on failure case the message is returned to usable
state for another send attempt.
- if(outbound != null) {
+ if (outbound != null) {
outbound.onSendComplete();
}
throw jmsEx;
@@ -1511,6 +1536,10 @@ public class JmsSession implements AutoCloseable,
Session, QueueSession, TopicSe
if (producerId == null) {
asyncSendQueue.clear();
}
+
+ if (closed.get() && asyncSendsCompletion != null &&
asyncSendQueue.isEmpty()) {
+ asyncSendsCompletion.onSuccess();
+ }
}
}
@@ -1577,6 +1606,10 @@ public class JmsSession implements AutoCloseable,
Session, QueueSession, TopicSe
}
}
}
+
+ if (closed.get() && asyncSendsCompletion != null &&
asyncSendQueue.isEmpty()) {
+ asyncSendsCompletion.onSuccess();
+ }
} catch (Exception ex) {
LOG.error("Async completion task encountered unexpected
failure", ex);
}
diff --git
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
index 639c5d8f..298d97e3 100644
---
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
+++
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
@@ -581,7 +581,7 @@ public class BytesMessageIntegrationTest extends
QpidJmsTestCase {
@Timeout(20)
public void testAsyncCompletionSendMarksBytesMessageReadOnly() throws
Exception {
try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
- JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer);
+ JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer, "jms.closeTimeout=50");
connection.setSendTimeout(15000);
testPeer.expectBegin();
diff --git
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
index 954938f0..60b9c81c 100644
---
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
+++
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
@@ -440,7 +440,7 @@ public class MapMessageIntegrationTest extends
QpidJmsTestCase {
@Timeout(20)
public void testAsyncCompletionSendMarksMapMessageReadOnly() throws
Exception {
try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
- JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer);
+ JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer, "jms.closeTimeout=50");
connection.setSendTimeout(15000);
testPeer.expectBegin();
diff --git
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
index 60f3020d..19bcf698 100644
---
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
+++
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
@@ -2231,7 +2231,7 @@ public class MessageIntegrationTest extends
QpidJmsTestCase
@Timeout(20)
public void testAsyncCompletionSendMarksMessageReadOnly() throws Exception
{
try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
- JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer);
+ JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer, "jms.closeTimeout=50");
connection.setSendTimeout(15000);
testPeer.expectBegin();
diff --git
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java
index ebe2dcec..5b6911ef 100644
---
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java
+++
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java
@@ -660,6 +660,7 @@ public class ObjectMessageIntegrationTest extends
QpidJmsTestCase {
try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer);
connection.setSendTimeout(15000);
+ connection.setCloseTimeout(10);
testPeer.expectBegin();
diff --git
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index e01c6a4f..4dd6b01d 100644
---
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -2322,7 +2322,7 @@ public class ProducerIntegrationTest extends
QpidJmsTestCase {
@Timeout(20)
public void testAsyncCompletionGetsNotifiedWhenSessionClosed() throws
Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer);
+ JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer, "jms.closeTimeout=100");
testPeer.expectBegin();
testPeer.expectSenderAttach();
@@ -2343,6 +2343,8 @@ public class ProducerIntegrationTest extends
QpidJmsTestCase {
producer.send(message, listener);
+ assertFalse(listener.hasCompleted()); // Close should complete it
as failed on timeout
+
session.close();
assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS), "Did not
get async callback");
@@ -2356,11 +2358,51 @@ public class ProducerIntegrationTest extends
QpidJmsTestCase {
}
}
+ @Test
+ @Timeout(20)
+ public void
testAsyncCompletionGetsNotifiedWhenSessionClosedAndWaitForCompletion() throws
Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer, "jms.closeTimeout=1000");
+
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ MessageProducer producer = session.createProducer(queue);
+
+ // Create and transfer a new message
+ String text = "myMessage";
+ testPeer.expectTransfer(new TransferPayloadCompositeMatcher(),
nullValue(), false, true, new Accepted(), true, 0, 100);
+ testPeer.expectEnd();
+ testPeer.expectClose();
+
+ TextMessage message = session.createTextMessage(text);
+ TestJmsCompletionListener listener = new
TestJmsCompletionListener();
+
+ producer.send(message, listener);
+
+ assertFalse(listener.hasCompleted()); // Close should complete it
as accepted after the delay
+
+ session.close();
+
+ assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS), "Did not
get async callback");
+ assertNull(listener.exception);
+ assertNotNull(listener.message);
+ assertTrue(listener.message instanceof TextMessage);
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(2000);
+ }
+ }
+
@Test
@Timeout(20)
public void testAsyncCompletionGetsNotifiedWhenConnectionClosed() throws
Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer);
+ JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer, "jms.closeTimeout=150");
testPeer.expectBegin();
testPeer.expectSenderAttach();
@@ -2380,6 +2422,8 @@ public class ProducerIntegrationTest extends
QpidJmsTestCase {
producer.send(message, listener);
+ assertFalse(listener.hasCompleted());
+
connection.close();
assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS), "Did not
get async callback");
@@ -2391,6 +2435,43 @@ public class ProducerIntegrationTest extends
QpidJmsTestCase {
}
}
+ @Test
+ @Timeout(20)
+ public void
testAsyncCompletionAllowedToCompleteNormallyWhenConnectionClosed() throws
Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer, "jms.closeTimeout=1000");
+
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ MessageProducer producer = session.createProducer(queue);
+
+ // Create and transfer a new message
+ String text = "myMessage";
+ testPeer.expectTransfer(new TransferPayloadCompositeMatcher(),
nullValue(), false, true, new Accepted(), true, 0, 100);
+ testPeer.expectClose();
+
+ TextMessage message = session.createTextMessage(text);
+ TestJmsCompletionListener listener = new
TestJmsCompletionListener();
+
+ producer.send(message, listener);
+
+ assertFalse(listener.hasCompleted());
+
+ connection.close();
+
+ assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS), "Did not
get async callback");
+ assertNull(listener.exception);
+ assertNotNull(listener.message);
+ assertTrue(listener.message instanceof TextMessage);
+
+ testPeer.waitForAllHandlersToComplete(2000);
+ }
+ }
+
@Test
@Timeout(20)
public void testAsyncCompletionResetsBytesMessage() throws Exception {
@@ -2856,7 +2937,7 @@ public class ProducerIntegrationTest extends
QpidJmsTestCase {
@Timeout(20)
public void
testRemotelyCloseOneProducerDoesNotCompleteAsyncSendFromAnotherProducer()
throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer);
+ JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer, "jms.closeTimeout=150");
testPeer.expectBegin();
testPeer.expectSenderAttach();
@@ -2895,7 +2976,7 @@ public class ProducerIntegrationTest extends
QpidJmsTestCase {
assertFalse(listener.awaitCompletion(10, TimeUnit.MILLISECONDS),
"Should not get async callback");
- // Closing the session should complete the send with an exception
+ // Closing the session should complete the send with an exception
after timeout
testPeer.expectEnd();
session.close();
@@ -3044,6 +3125,10 @@ public class ProducerIntegrationTest extends
QpidJmsTestCase {
this.completed = new CountDownLatch(expected);
}
+ public boolean hasCompleted() {
+ return completed.getCount() == 0;
+ }
+
public boolean awaitCompletion(long timeout, TimeUnit units) throws
InterruptedException {
return completed.await(timeout, units);
}
@@ -3239,4 +3324,74 @@ public class ProducerIntegrationTest extends
QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(1000);
}
}
+
+ @Test
+ @Timeout(20)
+ public void testRemotelyEndConnectionCompletesAsyncSends() throws
Exception {
+ final String BREAD_CRUMB = "ErrorMessage";
+
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ final CountDownLatch connectionClosed = new CountDownLatch(1);
+ JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer);
+ connection.addConnectionListener(new
JmsDefaultConnectionListener() {
+ @Override
+ public void onConnectionFailure(Throwable exception) {
+ connectionClosed.countDown();
+ }
+ });
+
+ testPeer.expectBegin();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ // Create a producer, then remotely end the session afterwards.
+ testPeer.expectSenderAttach();
+
+ Queue queue = session.createQueue("myQueue");
+ final MessageProducer producer = session.createProducer(queue);
+
+ final int MSG_COUNT = 3;
+
+ for (int i = 0; i < MSG_COUNT; ++i) {
+ testPeer.expectTransferButDoNotRespond(new
TransferPayloadCompositeMatcher());
+ }
+
+ TestJmsCompletionListener listener = new
TestJmsCompletionListener(MSG_COUNT);
+ try {
+ for (int i = 0; i < MSG_COUNT; ++i) {
+ Message message = session.createTextMessage("content");
+ producer.send(message, listener);
+ }
+ } catch (JMSException e) {
+ LOG.warn("Caught unexpected error: {}", e.getMessage());
+ fail("No expected exception for this send.");
+ }
+
+ testPeer.waitForAllHandlersToComplete(2000);
+ testPeer.expectSenderAttach();
+ testPeer.remotelyCloseConnection(true, AmqpError.RESOURCE_DELETED,
BREAD_CRUMB, 50);
+
+ session.createProducer(queue);
+
+ // Verify the session gets marked closed
+ assertTrue(connectionClosed.await(5, TimeUnit.SECONDS), "Session
closed callback didn't trigger");
+
+ try {
+ producer.getDeliveryMode();
+ fail("Expected ISE to be thrown due to being closed");
+ } catch (IllegalStateException jmsise) {
+ String errorMessage = jmsise.getCause().getMessage();
+
assertTrue(errorMessage.contains(AmqpError.RESOURCE_DELETED.toString()));
+ assertTrue(errorMessage.contains(BREAD_CRUMB));
+ }
+
+ assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS));
+ assertEquals(MSG_COUNT, listener.errorCount); // All sends should
have been failed
+
+ // Try closing it explicitly, should effectively no-op in client.
+ // The test peer will throw during close if it sends anything.
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
}
diff --git
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 98a12eca..c8d98145 100644
---
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -2333,6 +2333,10 @@ public class SessionIntegrationTest extends
QpidJmsTestCase {
completed = new CountDownLatch(expected);
}
+ public boolean hasCompleted() {
+ return completed.getCount() == 0;
+ }
+
public boolean awaitCompletion(long timeout, TimeUnit units) throws
InterruptedException {
return completed.await(timeout, units);
}
@@ -2836,6 +2840,82 @@ public class SessionIntegrationTest extends
QpidJmsTestCase {
}
}
+ @Test
+ @Timeout(20)
+ public void testRemotelyEndSessionCompletesAsyncSends() throws Exception {
+ final String BREAD_CRUMB = "ErrorMessage";
+
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ final CountDownLatch sessionClosed = new CountDownLatch(1);
+ JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer);
+ connection.addConnectionListener(new
JmsDefaultConnectionListener() {
+ @Override
+ public void onSessionClosed(Session session, Throwable
exception) {
+ sessionClosed.countDown();
+ }
+ });
+
+ testPeer.expectBegin();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ // Create a producer, then remotely end the session afterwards.
+ testPeer.expectSenderAttach();
+
+ Queue queue = session.createQueue("myQueue");
+ final MessageProducer producer = session.createProducer(queue);
+
+ final int MSG_COUNT = 3;
+
+ for (int i = 0; i < MSG_COUNT; ++i) {
+ testPeer.expectTransferButDoNotRespond(new
TransferPayloadCompositeMatcher());
+ }
+
+ TestJmsCompletionListener listener = new
TestJmsCompletionListener(MSG_COUNT);
+ try {
+ for (int i = 0; i < MSG_COUNT; ++i) {
+ Message message = session.createTextMessage("content");
+ producer.send(message, listener);
+ }
+ } catch (JMSException e) {
+ LOG.warn("Caught unexpected error: {}", e.getMessage());
+ fail("No expected exception for this send.");
+ }
+
+ testPeer.waitForAllHandlersToComplete(2000);
+
+ assertFalse(listener.hasCompleted());
+
+ testPeer.expectSenderAttach();
+ testPeer.remotelyEndLastOpenedSession(true, 50,
AmqpError.RESOURCE_DELETED, BREAD_CRUMB);
+
+ session.createProducer(queue);
+
+ // Verify the session gets marked closed
+ assertTrue(sessionClosed.await(5, TimeUnit.SECONDS), "Session
closed callback didn't trigger");
+
+ try {
+ producer.getDeliveryMode();
+ fail("Expected ISE to be thrown due to being closed");
+ } catch (IllegalStateException jmsise) {
+ String errorMessage = jmsise.getCause().getMessage();
+
assertTrue(errorMessage.contains(AmqpError.RESOURCE_DELETED.toString()));
+ assertTrue(errorMessage.contains(BREAD_CRUMB));
+ }
+
+ assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS));
+ assertEquals(MSG_COUNT, listener.errorCount); // All sends should
have been failed
+
+ // Try closing it explicitly, should effectively no-op in client.
+ // The test peer will throw during close if it sends anything.
+ session.close();
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
private boolean verifyConsumerClosure(final String BREAD_CRUMB, final
MessageConsumer consumer) throws Exception {
return Wait.waitFor(new Wait.Condition() {
@Override
diff --git
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
index b7906e48..3b09dfc7 100644
---
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
+++
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
@@ -439,6 +439,7 @@ public class StreamMessageIntegrationTest extends
QpidJmsTestCase {
try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer);
connection.setSendTimeout(15000);
+ connection.setSendTimeout(15);
testPeer.expectBegin();
diff --git
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java
index 4653447c..1d907c6b 100644
---
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java
+++
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java
@@ -447,6 +447,7 @@ public class TextMessageIntegrationTest extends
QpidJmsTestCase {
try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer);
connection.setSendTimeout(15000);
+ connection.setCloseTimeout(15);
testPeer.expectBegin();
diff --git
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 9ae890f7..a6e350a8 100644
---
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -4558,6 +4558,69 @@ public class FailoverIntegrationTest extends
QpidJmsTestCase {
}
}
+ @Test
+ @Timeout(20)
+ public void testFailoverDoesFailPendingAsyncCompletionSend() throws
Exception {
+ try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+ TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+ // Create a peer to connect to, then one to reconnect to
+ final String originalURI = createPeerURI(originalPeer);
+ final String finalURI = createPeerURI(finalPeer);
+
+ LOG.info("Original peer is at: {}", originalURI);
+ LOG.info("Final peer is at: {}", finalURI);
+
+ originalPeer.expectSaslAnonymous();
+ originalPeer.expectOpen();
+ originalPeer.expectBegin();
+ originalPeer.expectBegin();
+ originalPeer.expectSenderAttach();
+ originalPeer.expectTransferButDoNotRespond(new
TransferPayloadCompositeMatcher());
+ originalPeer.dropAfterLastHandler(15); // Wait for sender to get
into wait state
+
+ // --- Post Failover Expectations of sender --- //
+ finalPeer.expectSaslAnonymous();
+ finalPeer.expectOpen();
+ finalPeer.expectBegin();
+ finalPeer.expectBegin();
+ finalPeer.expectSenderAttach();
+
+ final JmsConnection connection =
establishAnonymousConnecton("failover.initialReconnectDelay=25", originalPeer,
finalPeer);
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ MessageProducer producer = session.createProducer(queue);
+
+ // Create and transfer a new message
+ String text = "myMessage";
+
+ TextMessage message = session.createTextMessage(text);
+ TestJmsCompletionListener listener = new
TestJmsCompletionListener();
+
+ try {
+ producer.send(message, listener);
+ } catch (JMSException jmsEx) {
+ fail("Should not have failed the async completion send.");
+ }
+
+ // This should fire after reconnect without an error, if it fires
with an error at
+ // any time then something is wrong.
+ assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS), "Did not
get async callback");
+ assertNotNull(listener.exception, "Completion should have been due
to error");
+ assertNotNull(listener.message);
+ assertTrue(listener.message instanceof TextMessage);
+
+ finalPeer.waitForAllHandlersToComplete(5000);
+ finalPeer.expectClose();
+
+ connection.close();
+
+ finalPeer.waitForAllHandlersToComplete(5000);
+ }
+ }
+
@Test
@Timeout(20)
public void testFailoverHandlesAnonymousFallbackWaitingForClose() throws
Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]