This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git
The following commit(s) were added to refs/heads/main by this push:
new 8ab821d QPIDJMS-534 Fail pending resource creation calls on
connection drop
8ab821d is described below
commit 8ab821db90506d5900c313260d6666fb8b6c0fe1
Author: Timothy Bish <[email protected]>
AuthorDate: Tue Apr 27 12:22:47 2021 -0400
QPIDJMS-534 Fail pending resource creation calls on connection drop
Respond quickly to resource creation failures on connection drop instead
of waiting for a configured request timeout to avoid stuck calls to
create during reconnection processing.
---
.../qpid/jms/provider/amqp/AmqpProvider.java | 22 +-
.../amqp/builders/AmqpResourceBuilder.java | 10 +-
.../provider/failover/FailoverIntegrationTest.java | 267 +++++++++++++++++++++
.../qpid/jms/test/testpeer/TestAmqpPeer.java | 33 +++
4 files changed, 328 insertions(+), 4 deletions(-)
diff --git
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 59aa383..8058678 100644
---
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -160,6 +160,7 @@ public class AmqpProvider implements Provider,
TransportListener , AmqpResourceP
private final ProviderFutureFactory futureFactory;
private AsyncResult connectionRequest;
private ScheduledFuture<?> nextIdleTimeoutCheck;
+ private List<AsyncResult> failOnConnectionDropList = new ArrayList<>();
/**
* Create a new instance of an AmqpProvider bonded to the given remote URI.
@@ -1146,8 +1147,17 @@ public class AmqpProvider implements Provider,
TransportListener , AmqpResourceP
failureCause = ex;
ProviderListener listener = this.listener;
- if (listener != null) {
-
listener.onConnectionFailure(ProviderExceptionSupport.createNonFatalOrPassthrough(ex));
+ try {
+ if (listener != null) {
+
listener.onConnectionFailure(ProviderExceptionSupport.createNonFatalOrPassthrough(ex));
+ }
+ } finally {
+ // Alert the request to the failure and then afterwards clean up
any stragglers that have not
+ // been altered to the provider having failed to avoid any
lingering blocked resource create
+ // calls and possibly others as needed.
+ for (AsyncResult request : failOnConnectionDropList) {
+ request.onFailure(ex);
+ }
}
}
@@ -1546,6 +1556,14 @@ public class AmqpProvider implements Provider,
TransportListener , AmqpResourceP
return null;
}
+ public void addToFailOnConnectionDropTracking(AsyncResult result) {
+ failOnConnectionDropList.add(result);
+ }
+
+ public void removeFromFailOnConnectionDropTracking(AsyncResult result) {
+ failOnConnectionDropList.remove(result);
+ }
+
//----- Internal implementation
------------------------------------------//
private void checkClosedOrFailed() throws ProviderException {
diff --git
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
index 408bda7..5dee9c0 100644
---
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
+++
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
@@ -54,10 +54,12 @@ public abstract class AmqpResourceBuilder<TARGET extends
AmqpResource, PARENT ex
protected ENDPOINT endpoint;
protected final PARENT parent;
protected final INFO resourceInfo;
+ protected final AmqpProvider provider;
public AmqpResourceBuilder(PARENT parent, INFO resourceInfo) {
this.parent = parent;
this.resourceInfo = resourceInfo;
+ this.provider = parent.getProvider();
}
/**
@@ -72,6 +74,9 @@ public abstract class AmqpResourceBuilder<TARGET extends
AmqpResource, PARENT ex
public void buildResource(final AsyncResult request) {
this.request = request;
+ // Store the request with the provider for failure if connection drops
+ provider.addToFailOnConnectionDropTracking(request);
+
// Create the local end of the manage resource.
endpoint = createEndpoint(resourceInfo);
endpoint.setContext(this);
@@ -80,8 +85,6 @@ public abstract class AmqpResourceBuilder<TARGET extends
AmqpResource, PARENT ex
// Create the resource object now
resource = createResource(parent, resourceInfo, endpoint);
- AmqpProvider provider = parent.getProvider();
-
if (getRequestTimeout() > JmsConnectionInfo.INFINITE) {
// Attempt to schedule a cancellation of the pending open request,
can return
@@ -147,6 +150,7 @@ public abstract class AmqpResourceBuilder<TARGET extends
AmqpResource, PARENT ex
//----- Standard open and close handlers
---------------------------------//
protected final void handleOpened(AmqpProvider provider) {
+ provider.removeFromFailOnConnectionDropTracking(request);
// perform any post open processing prior to opened state inspection.
afterOpened();
@@ -172,6 +176,8 @@ public abstract class AmqpResourceBuilder<TARGET extends
AmqpResource, PARENT ex
}
protected final void handleClosed(AmqpProvider provider, ProviderException
cause) {
+ provider.removeFromFailOnConnectionDropTracking(request);
+
// If the resource being built is closed during the creation process
// then this is always an error.
diff --git
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 805ae70..4e0733b 100644
---
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -1877,6 +1877,42 @@ public class FailoverIntegrationTest extends
QpidJmsTestCase {
}
@Test(timeout = 20000)
+ public void testFailoverEnforcesRequestTimeoutSessionWhenBeginSent()
throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
+
+ // Create a peer to connect to so we can get to a state where we
+ // can try to send when offline.
+ final String peerURI = createPeerURI(testPeer);
+
+ LOG.info("Original peer is at: {}", peerURI);
+
+ // Connect to the test peer
+ testPeer.expectSaslAnonymous();
+ testPeer.expectOpen();
+ testPeer.expectBegin();
+ testPeer.expectBegin(false);
+ testPeer.dropAfterLastHandler();
+
+ final JmsConnection connection = establishAnonymousConnecton(
+
"jms.requestTimeout=1000&failover.reconnectDelay=2000&failover.maxReconnectAttempts=30",
testPeer);
+ connection.start();
+
+ try {
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ fail("Should have thrown an exception");
+ } catch (JmsOperationTimedOutException jmsEx) {
+ LOG.info("Caught timed out exception from send:", jmsEx);
+ } catch (Exception ex) {
+ fail("Should have caught a timed out exception");
+ }
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
public void testFailoverEnforcesSendTimeout() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
@@ -4914,6 +4950,237 @@ public class FailoverIntegrationTest extends
QpidJmsTestCase {
finalPeer.waitForAllHandlersToComplete(1000); }
}
+ @Test(timeout = 20000)
+ public void testSessionCreationRecoversAfterDropWithNoBeginResponse()
throws Exception {
+ try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+ TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+ final String content = "myContent";
+ final DescribedType amqpValueNullContent = new
AmqpValueDescribedType(content);
+
+ originalPeer.expectSaslAnonymous();
+ originalPeer.expectOpen();
+ originalPeer.expectBegin();
+ originalPeer.expectBegin(false);
+ originalPeer.dropAfterLastHandler(20);
+
+ finalPeer.expectSaslAnonymous();
+ finalPeer.expectOpen();
+ finalPeer.expectBegin();
+ finalPeer.expectBegin();
+ finalPeer.expectReceiverAttach();
+ finalPeer.expectLinkFlowRespondWithTransfer(null, null, null,
null, amqpValueNullContent);
+ finalPeer.expectDispositionThatIsAcceptedAndSettled();
+ finalPeer.expectClose();
+
+ final JmsConnection connection =
establishAnonymousConnecton(originalPeer, finalPeer);
+
+ try {
+ connection.start();
+ } catch (Exception ex) {
+ fail("Should not have thrown an Exception: " + ex);
+ }
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(2000);
+
+ connection.close();
+
+ originalPeer.waitForAllHandlersToCompleteNoAssert(1000);
+ finalPeer.waitForAllHandlersToComplete(1000);
+
+ assertNotNull(message);
+ assertTrue(message instanceof TextMessage);
+ assertEquals(content, ((TextMessage) message).getText());
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void
testMultipleSessionCreationRecoversAfterDropWithNoBeginResponseAndFailedRecoveryAttempt()
throws Exception {
+ try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+ TestAmqpPeer intermediatePeer = new TestAmqpPeer();
+ TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+ final String content = "myContent";
+ final DescribedType amqpValueNullContent = new
AmqpValueDescribedType(content);
+
+ originalPeer.expectSaslAnonymous();
+ originalPeer.expectOpen();
+ originalPeer.expectBegin();
+ originalPeer.expectBegin();
+ originalPeer.expectBegin(false);
+ originalPeer.dropAfterLastHandler(20);
+
+ intermediatePeer.expectSaslAnonymous();
+ intermediatePeer.expectOpen();
+ intermediatePeer.expectBegin();
+ intermediatePeer.expectBegin(false);
+ intermediatePeer.dropAfterLastHandler();
+
+ finalPeer.expectSaslAnonymous();
+ finalPeer.expectOpen();
+ finalPeer.expectBegin();
+ finalPeer.expectBegin();
+ finalPeer.expectBegin();
+ finalPeer.expectReceiverAttach();
+ finalPeer.expectLinkFlowRespondWithTransfer(null, null, null,
null, amqpValueNullContent);
+ finalPeer.expectDispositionThatIsAcceptedAndSettled();
+ finalPeer.expectReceiverAttach();
+ finalPeer.expectLinkFlowRespondWithTransfer(null, null, null,
null, amqpValueNullContent);
+ finalPeer.expectDispositionThatIsAcceptedAndSettled();
+ finalPeer.expectClose();
+
+ final JmsConnection connection =
establishAnonymousConnecton(originalPeer, intermediatePeer, finalPeer);
+
+ try {
+ connection.start();
+ } catch (Exception ex) {
+ fail("Should not have thrown an Exception: " + ex);
+ }
+
+ Session session1 = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Session session2 = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue1 = session1.createQueue("myQueue");
+ MessageConsumer consumer1 = session1.createConsumer(queue1);
+ Message message1 = consumer1.receive(2000);
+
+ Queue queue2 = session2.createQueue("myQueue");
+ MessageConsumer consumer2 = session2.createConsumer(queue2);
+ Message message2 = consumer2.receive(2000);
+
+ connection.close();
+
+ originalPeer.waitForAllHandlersToComplete(1000);
+ intermediatePeer.waitForAllHandlersToComplete(1000);
+ finalPeer.waitForAllHandlersToComplete(1000);
+
+ assertNotNull(message1);
+ assertTrue(message1 instanceof TextMessage);
+ assertEquals(content, ((TextMessage) message1).getText());
+ assertNotNull(message2);
+ assertTrue(message2 instanceof TextMessage);
+ assertEquals(content, ((TextMessage) message2).getText());
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void
testMultipleSenderCreationRecoversAfterDropWithNoAttachResponseAndFailedRecoveryAttempt()
throws Exception {
+ try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+ TestAmqpPeer intermediatePeer = new TestAmqpPeer();
+ TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+ originalPeer.expectSaslAnonymous();
+ originalPeer.expectOpen();
+ originalPeer.expectBegin();
+ originalPeer.expectBegin();
+ originalPeer.expectSenderAttach();
+ originalPeer.expectSenderAttachButDoNotRespond();
+ originalPeer.dropAfterLastHandler(20);
+
+ intermediatePeer.expectSaslAnonymous();
+ intermediatePeer.expectOpen();
+ intermediatePeer.expectBegin();
+ intermediatePeer.expectBegin();
+ intermediatePeer.expectSenderAttachButDoNotRespond();
+ intermediatePeer.dropAfterLastHandler();
+
+ finalPeer.expectSaslAnonymous();
+ finalPeer.expectOpen();
+ finalPeer.expectBegin();
+ finalPeer.expectBegin();
+ finalPeer.expectSenderAttach();
+ finalPeer.expectSenderAttach();
+ finalPeer.expectClose();
+
+ final JmsConnection connection =
establishAnonymousConnecton(originalPeer, intermediatePeer, finalPeer);
+
+ try {
+ connection.start();
+ } catch (Exception ex) {
+ fail("Should not have thrown an Exception: " + ex);
+ }
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ MessageProducer producer1 = session.createProducer(queue);
+ MessageProducer producer2 = session.createProducer(queue);
+
+ assertNotNull(producer1);
+ assertNotNull(producer2);
+
+ assertEquals(queue, producer1.getDestination());
+ assertEquals(queue, producer2.getDestination());
+
+ connection.close();
+
+ originalPeer.waitForAllHandlersToComplete(1000);
+ intermediatePeer.waitForAllHandlersToComplete(1000);
+ finalPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void
testSenderAndReceiverCreationRecoversAfterDropWithNoAttachResponseAndFailedRecoveryAttempt()
throws Exception {
+ try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+ TestAmqpPeer intermediatePeer = new TestAmqpPeer();
+ TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+ originalPeer.expectSaslAnonymous();
+ originalPeer.expectOpen();
+ originalPeer.expectBegin();
+ originalPeer.expectBegin();
+ originalPeer.expectSenderAttach();
+ originalPeer.expectReceiverAttachButDoNotRespond();
+ originalPeer.dropAfterLastHandler(20);
+
+ intermediatePeer.expectSaslAnonymous();
+ intermediatePeer.expectOpen();
+ intermediatePeer.expectBegin();
+ intermediatePeer.expectBegin();
+ intermediatePeer.expectSenderAttachButDoNotRespond();
+ intermediatePeer.dropAfterLastHandler(10);
+
+ finalPeer.expectSaslAnonymous();
+ finalPeer.expectOpen();
+ finalPeer.expectBegin();
+ finalPeer.expectBegin();
+ finalPeer.expectSenderAttach();
+ finalPeer.expectReceiverAttach();
+ finalPeer.expectLinkFlow();
+ finalPeer.expectClose();
+
+ final JmsConnection connection =
establishAnonymousConnecton(originalPeer, intermediatePeer, finalPeer);
+
+ try {
+ connection.start();
+ } catch (Exception ex) {
+ fail("Should not have thrown an Exception: " + ex);
+ }
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ MessageProducer producer = session.createProducer(queue);
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ assertNotNull(producer);
+ assertNotNull(consumer);
+
+ assertEquals(queue, producer.getDestination());
+ assertNull(consumer.getMessageListener());
+
+ connection.close();
+
+ originalPeer.waitForAllHandlersToComplete(1000);
+ intermediatePeer.waitForAllHandlersToComplete(1000);
+ finalPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
private JmsConnection establishAnonymousConnecton(TestAmqpPeer... peers)
throws JMSException {
return establishAnonymousConnecton(null, null, peers);
}
diff --git
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 6d54148..388323b 100644
---
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -1019,6 +1019,11 @@ public class TestAmqpPeer implements AutoCloseable
expectBegin(notNullValue(), true);
}
+ public void expectBegin(boolean sendResponse)
+ {
+ expectBegin(notNullValue(), sendResponse);
+ }
+
public void expectBegin(Matcher<?> outgoingWindowMatcher, boolean
sendResponse)
{
final BeginMatcher beginMatcher = new BeginMatcher()
@@ -1240,6 +1245,20 @@ public class TestAmqpPeer implements AutoCloseable
addHandler(attachMatcher);
}
+ public void expectSenderAttachButDoNotRespond()
+ {
+ final AttachMatcher attachMatcher = new AttachMatcher()
+ .withName(notNullValue())
+ .withHandle(notNullValue())
+ .withRole(equalTo(Role.SENDER))
+ .withSndSettleMode(Matchers.oneOf(SenderSettleMode.SETTLED,
SenderSettleMode.UNSETTLED))
+ .withRcvSettleMode(equalTo(ReceiverSettleMode.FIRST))
+ .withSource(notNullValue())
+ .withTarget(notNullValue());
+
+ addHandler(attachMatcher);
+ }
+
public void expectSenderAttach()
{
expectSenderAttach(notNullValue(), false, false);
@@ -1433,6 +1452,20 @@ public class TestAmqpPeer implements AutoCloseable
expectReceiverAttach(notNullValue(), notNullValue(), true);
}
+ public void expectReceiverAttachButDoNotRespond()
+ {
+ final AttachMatcher attachMatcher = new AttachMatcher()
+ .withName(notNullValue())
+ .withHandle(notNullValue())
+ .withRole(equalTo(Role.RECEIVER))
+ .withSndSettleMode(Matchers.oneOf(SenderSettleMode.SETTLED,
SenderSettleMode.UNSETTLED))
+ .withRcvSettleMode(equalTo(ReceiverSettleMode.FIRST))
+ .withSource(notNullValue())
+ .withTarget(notNullValue());
+
+ addHandler(attachMatcher);
+ }
+
public void expectReceiverAttach()
{
expectReceiverAttach(notNullValue(), notNullValue());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]