This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 1633b8bf7e ARTEMIS-5215 Credit handler reset in progress flag when 
stopped
1633b8bf7e is described below

commit 1633b8bf7e605a7893d97ec8c8d6ff1ba7aac830
Author: Timothy Bish <[email protected]>
AuthorDate: Thu Jan 30 18:18:32 2025 -0500

    ARTEMIS-5215 Credit handler reset in progress flag when stopped
    
    When a pull mode federation consumer is stopped due to demand being
    removed and the credit tup-up handler is also awaiting the Queue backlog
    to clear in order to grant a new batch of credit it might exit that
    cycle an leave the in-progress flag set to true. Currently this likely
    won't trigger a stuck consumer but if the code was to be altered to hold
    open a link for some period of time before fully closing in order to
    avoid needless attach / detach cycles then it would be possible for the
    credit replenishment to get stuck because the previous attempt left the
    in-progress flag set to true.
---
 .../federation/AMQPFederationQueueConsumer.java    | 17 ++--
 .../connect/AMQPFederationQueuePolicyTest.java     | 98 ++++++++++++++++++++++
 2 files changed, 108 insertions(+), 7 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java
index a56da8c46b..d5197b9864 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java
@@ -424,14 +424,17 @@ public final class AMQPFederationQueueConsumer extends 
AMQPFederationConsumer {
       private void performCreditTopUp() {
          connection.requireInHandler();
 
-         if (!isStarted() || receiver.getLocalState() != EndpointState.ACTIVE) 
{
-            return; // Closed or stopped before this was triggered.
-         }
+         try {
+            if (!isStarted() || receiver.getLocalState() != 
EndpointState.ACTIVE) {
+               return; // Closed or stopped before this was triggered.
+            }
 
-         receiver.flow(configuration.getPullReceiverBatchSize());
-         connection.instantFlush();
-         lastBacklogCheckDelay = 0;
-         creditTopUpInProgress.set(false);
+            receiver.flow(configuration.getPullReceiverBatchSize());
+            connection.instantFlush();
+            lastBacklogCheckDelay = 0;
+         } finally {
+            creditTopUpInProgress.set(false);
+         }
       }
    }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
index df34112e61..7a843c4c63 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
@@ -4231,6 +4231,7 @@ public class AMQPFederationQueuePolicyTest extends 
AmqpClientTestSupport {
 
             connection.start();
 
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
             peer.expectFlow().withLinkCredit(1000).withDrain(true);  // Don't 
answer drained
          }
 
@@ -4261,6 +4262,103 @@ public class AMQPFederationQueuePolicyTest extends 
AmqpClientTestSupport {
       }
    }
 
+   @Test
+   @Timeout(20)
+   public void 
testPullModeFederationLinksRestartedBeforeRemoteFinishesDraining() throws 
Exception {
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect();
+         peer.expectOpen().respond();
+         peer.expectBegin().respond();
+         peer.expectAttach().ofSender()
+                            
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+                            .respondInKind();
+         peer.expectAttach().ofReceiver()
+                            
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
+                            .respondInKind();
+         peer.expectFlow().withLinkCredit(10);
+         peer.start();
+
+         final URI remoteURI = peer.getServerURI();
+         logger.info("Connect test started, peer listening on: {}", remoteURI);
+
+         final AMQPFederationQueuePolicyElement receiveFromQueue = new 
AMQPFederationQueuePolicyElement();
+         receiveFromQueue.setName("queue-policy");
+         receiveFromQueue.addToIncludes(getTestName(), getTestName());
+         receiveFromQueue.addProperty(RECEIVER_CREDITS, 0);
+         receiveFromQueue.addProperty(PULL_RECEIVER_BATCH_SIZE, 1);
+
+         final AMQPFederatedBrokerConnectionElement element = new 
AMQPFederatedBrokerConnectionElement();
+         element.setName(getTestName());
+         element.addLocalQueuePolicy(receiveFromQueue);
+
+         final AMQPBrokerConnectConfiguration amqpConnection =
+            new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+         amqpConnection.setReconnectAttempts(0);// No reconnects
+         amqpConnection.addElement(element);
+
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         server.start();
+         
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.ANYCAST)
+                                                                
.setAddress(getTestName())
+                                                                
.setAutoCreated(false));
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectAttach().ofReceiver()
+                            
.withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString())
+                            .withName(allOf(containsString(getTestName()),
+                                            containsString("queue-receiver"),
+                                            
containsString(server.getNodeID().toString())))
+                            .respondInKind();
+         peer.expectFlow().withLinkCredit(1);
+
+         final ConnectionFactory factory = CFUtil.createConnectionFactory(
+            "AMQP", "tcp://localhost:" + AMQP_PORT + 
"?jms.prefetchPolicy.all=0");
+
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            final MessageProducer producer = 
session.createProducer(session.createQueue(getTestName()));
+
+            session.createConsumer(session.createQueue(getTestName()));
+
+            connection.start();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.expectFlow().withLinkCredit(1).withDrain(true);  // Don't 
answer drained
+
+            // Adds backlog on the Queue, next federation consumer restart 
will wait for credit
+            // until the backlog is removed.
+            producer.send(session.createMessage());
+         }
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            final MessageConsumer consumer = 
session.createConsumer(session.createQueue(getTestName()));
+
+            connection.start();
+
+            // Now answer the drained and expect that the queue policy manager 
will then
+            // restart flow of credit based on new demand as expected above.
+            
peer.remoteFlow().withLinkCredit(0).withDeliveryCount(1).withDrain(true).now();
+
+            // This should result from the answer to the next flow that drains 
the link.
+            peer.expectFlow().withLinkCredit(1);
+
+            assertNotNull(consumer.receiveNoWait()); // Remove backlog to 
allow credit to federation consumer
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.expectFlow().withLinkCredit(1).withDrain(true)
+                             .respond()
+                             
.withLinkCredit(0).withDeliveryCount(2).withDrain(true);
+            peer.expectDetach().respond();
+         }
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+      }
+   }
+
    @Test
    @Timeout(20)
    public void testFederationLinksDetachesAfterLinkQuiesceTimeout() throws 
Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to