This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 69fe3a147f ARTEMIS-6031 Handle credit starvation affecting Core bridge
69fe3a147f is described below

commit 69fe3a147f72cbc57685fa554a97569ec85ce1b2
Author: AntonRoskvist <[email protected]>
AuthorDate: Wed Apr 29 09:01:26 2026 +0200

    ARTEMIS-6031 Handle credit starvation affecting Core bridge
---
 .../client/impl/AbstractProducerCreditsImpl.java   |  4 +-
 .../impl/AsynchronousProducerCreditsImplTest.java  | 48 +++++++++++
 .../integration/cluster/bridge/BridgeTest.java     | 94 ++++++++++++++++++++++
 3 files changed, 144 insertions(+), 2 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
index ac83c29b1a..8e9381eead 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
@@ -162,7 +162,7 @@ public abstract class AbstractProducerCreditsImpl 
implements ClientProducerCredi
       int toRequest = -1;
 
       synchronized (this) {
-         if (getBalance() + arriving < needed) {
+         if (getBalance() + arriving <= needed) {
             toRequest = needed - arriving;
 
             if (logger.isTraceEnabled()) {
@@ -170,7 +170,7 @@ public abstract class AbstractProducerCreditsImpl 
implements ClientProducerCredi
             }
          } else {
             if (logger.isTraceEnabled()) {
-               logger.trace("CheckCredits did not need it, balance={}, 
arriving={},  needed={}, getbalance + arriving < needed={}", getBalance(), 
arriving, needed, (boolean)(getBalance() + arriving < needed));
+               logger.trace("CheckCredits did not need it, balance={}, 
arriving={},  needed={}, getbalance + arriving <= needed={}", getBalance(), 
arriving, needed, (boolean)(getBalance() + arriving <= needed));
             }
          }
       }
diff --git 
a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImplTest.java
 
b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImplTest.java
index 7783894764..8e32131eed 100644
--- 
a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImplTest.java
+++ 
b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImplTest.java
@@ -17,10 +17,16 @@
 
 package org.apache.activemq.artemis.core.client.impl;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.mockito.Mockito;
 
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
 public class AsynchronousProducerCreditsImplTest {
 
    @Test
@@ -32,4 +38,46 @@ public class AsynchronousProducerCreditsImplTest {
       Mockito.verify(session).sendProducerCreditsMessage(0, null);
    }
 
+   @Test
+   @Timeout(10)
+   public void testCreditsRequestedWhenMessageSizeExactlyEqualsBalance() 
throws Exception {
+      ClientSessionInternal mockClientSession = 
Mockito.mock(ClientSessionInternal.class);
+
+      AtomicInteger creditsRequested = new AtomicInteger(0);
+      AtomicBoolean blocked = new AtomicBoolean(false);
+
+      int producerWindowSize = 1000;
+
+      Mockito.doAnswer(inv -> {
+         creditsRequested.addAndGet(inv.getArgument(0));
+         return null;
+      }).when(mockClientSession).sendProducerCreditsMessage(Mockito.anyInt(), 
Mockito.any());
+
+      AsynchronousProducerCreditsImpl producerCredits = new 
AsynchronousProducerCreditsImpl(mockClientSession, null, producerWindowSize, 
new ClientProducerFlowCallback() {
+
+         @Override
+         public void onCreditsFlow(boolean isBlocked, ClientProducerCredits 
credits) {
+            blocked.set(isBlocked);
+         }
+
+         @Override
+         public void onCreditsFail(ClientProducerCredits credits) {
+         }
+
+      });
+
+      int internalWindowSize = producerWindowSize / 2;
+      // drain balance to just above internalWindowSize
+      producerCredits.actualAcquire(internalWindowSize - 100);
+
+      int messageSize = producerCredits.getBalance();
+      producerCredits.acquireCredits(messageSize);
+
+      assertTrue(creditsRequested.get() > 0, "credits must be requested when 
message size exactly equals balance");
+
+      producerCredits.receiveCredits(creditsRequested.get());
+      assertTrue(producerCredits.getBalance() > 0);
+      assertFalse(blocked.get());
+   }
+
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
index 65b9264648..e5d3202acc 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
@@ -270,6 +270,100 @@ public class BridgeTest extends ActiveMQTestBase {
       long timeTaken = System.currentTimeMillis() - time;
    }
 
+   @TestTemplate
+   public void testBridgeHandlesReachingZeroCredits() throws Exception {
+      final int numMessages = 10;
+
+      final String testAddress = "testAddress";
+      final String queueName0 = "queue0";
+      final String forwardAddress = "forwardAddress";
+      final String queueName1 = "queue1";
+
+      final String bridgeName = "bridge0";
+
+      final int flowControlSize = 1024 * 10;
+      final AtomicInteger messageForwardSize = new AtomicInteger(0);
+
+      //This is used to capture the actual message size that the bridge will 
send
+      Transformer transformer = message -> {
+         messageForwardSize.addAndGet(message.getEncodeSize());
+         return message;
+      };
+
+      Map<String, Object> server0Params = new HashMap<>();
+      Map<String, Object> server1Params = new HashMap<>();
+
+      addTargetParameters(server1Params);
+
+      server0 = createClusteredServerWithParams(isNetty(), 0, true, 
server0Params);
+      server1 = createClusteredServerWithParams(isNetty(), 1, true, 
server1Params);
+
+      server0.getServiceRegistry().addBridgeTransformer(bridgeName, 
transformer);
+      TransportConfiguration server0tc = new 
TransportConfiguration(getConnector(), server0Params);
+      TransportConfiguration server1tc = new 
TransportConfiguration(getConnector(), server1Params);
+
+      
server0.getConfiguration().setConnectorConfigurations(Map.of(server1tc.getName(),
 server1tc));
+
+      server0.start();
+      server1.start();
+
+      
server0.createQueue(QueueConfiguration.of(queueName0).setAddress(testAddress));
+      
server1.createQueue(QueueConfiguration.of(queueName1).setAddress(forwardAddress));
+
+      locator = 
addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, 
server1tc));
+
+      server0.deployBridge(new BridgeConfiguration()
+                              .setName(bridgeName)
+                              .setQueueName(queueName0)
+                              .setForwardingAddress(forwardAddress)
+                              .setProducerWindowSize(flowControlSize)
+                              
.setStaticConnectors(List.of(server1tc.getName())));
+
+      ClientSessionFactory sf0 = 
addSessionFactory(locator.createSessionFactory(server0tc));
+      ClientSessionFactory sf1 = 
addSessionFactory(locator.createSessionFactory(server1tc));
+
+      ClientSession session0 = sf0.createSession(false, true, true);
+      ClientSession session1 = sf1.createSession(false, true, true);
+      session1.start();
+
+      ClientProducer producer0 = 
session0.createProducer(SimpleString.of(testAddress));
+      ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+      // send empty message to capture the size of a base message + bridge 
property
+      producer0.send(session0.createMessage(true));
+      Wait.assertTrue(() -> messageForwardSize.get() > 0);
+
+      // messageForwardSize is multiplied by two to account for previous and 
upcoming message
+      // the intention is to land on exactly 0 credits after the next message 
is sent
+      int sendSize = flowControlSize - (messageForwardSize.get() * 2);
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = session0.createMessage(true);
+         message.getBodyBuffer().writeBytes(new byte[sendSize]);
+         producer0.send(message);
+      }
+
+      for (int i = 0; i < numMessages + 1; i++) {
+         ClientMessage message = consumer1.receive(1000);
+         assertNotNull(message);
+         message.acknowledge();
+      }
+
+      assertNull(consumer1.receiveImmediate());
+
+      session0.close();
+      session1.close();
+      sf0.close();
+      sf1.close();
+
+      closeFields();
+
+      if (server0.getConfiguration().isPersistenceEnabled()) {
+         assertEquals(0, loadQueues(server0).size());
+      }
+
+   }
+
    @TestTemplate
    public void testBlockedBridgeAndReconnect() throws Exception {
       long time = System.currentTimeMillis();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to