This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 1633b8bf7e ARTEMIS-5215 Credit handler reset in progress flag when
stopped
1633b8bf7e is described below
commit 1633b8bf7e605a7893d97ec8c8d6ff1ba7aac830
Author: Timothy Bish <[email protected]>
AuthorDate: Thu Jan 30 18:18:32 2025 -0500
ARTEMIS-5215 Credit handler reset in progress flag when stopped
When a pull mode federation consumer is stopped due to demand being
removed and the credit tup-up handler is also awaiting the Queue backlog
to clear in order to grant a new batch of credit it might exit that
cycle an leave the in-progress flag set to true. Currently this likely
won't trigger a stuck consumer but if the code was to be altered to hold
open a link for some period of time before fully closing in order to
avoid needless attach / detach cycles then it would be possible for the
credit replenishment to get stuck because the previous attempt left the
in-progress flag set to true.
---
.../federation/AMQPFederationQueueConsumer.java | 17 ++--
.../connect/AMQPFederationQueuePolicyTest.java | 98 ++++++++++++++++++++++
2 files changed, 108 insertions(+), 7 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java
index a56da8c46b..d5197b9864 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java
@@ -424,14 +424,17 @@ public final class AMQPFederationQueueConsumer extends
AMQPFederationConsumer {
private void performCreditTopUp() {
connection.requireInHandler();
- if (!isStarted() || receiver.getLocalState() != EndpointState.ACTIVE)
{
- return; // Closed or stopped before this was triggered.
- }
+ try {
+ if (!isStarted() || receiver.getLocalState() !=
EndpointState.ACTIVE) {
+ return; // Closed or stopped before this was triggered.
+ }
- receiver.flow(configuration.getPullReceiverBatchSize());
- connection.instantFlush();
- lastBacklogCheckDelay = 0;
- creditTopUpInProgress.set(false);
+ receiver.flow(configuration.getPullReceiverBatchSize());
+ connection.instantFlush();
+ lastBacklogCheckDelay = 0;
+ } finally {
+ creditTopUpInProgress.set(false);
+ }
}
}
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
index df34112e61..7a843c4c63 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
@@ -4231,6 +4231,7 @@ public class AMQPFederationQueuePolicyTest extends
AmqpClientTestSupport {
connection.start();
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectFlow().withLinkCredit(1000).withDrain(true); // Don't
answer drained
}
@@ -4261,6 +4262,103 @@ public class AMQPFederationQueuePolicyTest extends
AmqpClientTestSupport {
}
}
+ @Test
+ @Timeout(20)
+ public void
testPullModeFederationLinksRestartedBeforeRemoteFinishesDraining() throws
Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender()
+
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+ .respondInKind();
+ peer.expectAttach().ofReceiver()
+
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
+ .respondInKind();
+ peer.expectFlow().withLinkCredit(10);
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Connect test started, peer listening on: {}", remoteURI);
+
+ final AMQPFederationQueuePolicyElement receiveFromQueue = new
AMQPFederationQueuePolicyElement();
+ receiveFromQueue.setName("queue-policy");
+ receiveFromQueue.addToIncludes(getTestName(), getTestName());
+ receiveFromQueue.addProperty(RECEIVER_CREDITS, 0);
+ receiveFromQueue.addProperty(PULL_RECEIVER_BATCH_SIZE, 1);
+
+ final AMQPFederatedBrokerConnectionElement element = new
AMQPFederatedBrokerConnectionElement();
+ element.setName(getTestName());
+ element.addLocalQueuePolicy(receiveFromQueue);
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.addElement(element);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.ANYCAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false));
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofReceiver()
+
.withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString())
+ .withName(allOf(containsString(getTestName()),
+ containsString("queue-receiver"),
+
containsString(server.getNodeID().toString())))
+ .respondInKind();
+ peer.expectFlow().withLinkCredit(1);
+
+ final ConnectionFactory factory = CFUtil.createConnectionFactory(
+ "AMQP", "tcp://localhost:" + AMQP_PORT +
"?jms.prefetchPolicy.all=0");
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageProducer producer =
session.createProducer(session.createQueue(getTestName()));
+
+ session.createConsumer(session.createQueue(getTestName()));
+
+ connection.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectFlow().withLinkCredit(1).withDrain(true); // Don't
answer drained
+
+ // Adds backlog on the Queue, next federation consumer restart
will wait for credit
+ // until the backlog is removed.
+ producer.send(session.createMessage());
+ }
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer consumer =
session.createConsumer(session.createQueue(getTestName()));
+
+ connection.start();
+
+ // Now answer the drained and expect that the queue policy manager
will then
+ // restart flow of credit based on new demand as expected above.
+
peer.remoteFlow().withLinkCredit(0).withDeliveryCount(1).withDrain(true).now();
+
+ // This should result from the answer to the next flow that drains
the link.
+ peer.expectFlow().withLinkCredit(1);
+
+ assertNotNull(consumer.receiveNoWait()); // Remove backlog to
allow credit to federation consumer
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectFlow().withLinkCredit(1).withDrain(true)
+ .respond()
+
.withLinkCredit(0).withDeliveryCount(2).withDrain(true);
+ peer.expectDetach().respond();
+ }
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+ }
+ }
+
@Test
@Timeout(20)
public void testFederationLinksDetachesAfterLinkQuiesceTimeout() throws
Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact