This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new f7485a63c6 ARTEMIS-5536 Improving reliabiilty of openwire flow control
f7485a63c6 is described below

commit f7485a63c61ab88deed55807748c7c815a910e0c
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Jun 9 17:07:22 2025 -0400

    ARTEMIS-5536 Improving reliabiilty of openwire flow control
    
    - AMQConsumer used to discount credits ahead of taking them on rollback, 
that would lead to negative values
    - AMQConsumer.close() was blocking an operation
    - fixing other branches for flow control
---
 .../artemis/utils/actors/ProcessorBase.java        |  23 +++-
 .../artemis/utils/actors/PauseActorTest.java       |  96 ++++++++++++++
 .../core/protocol/openwire/OpenWireConnection.java |   8 ++
 .../core/protocol/openwire/amq/AMQConsumer.java    |  95 +++++++++++---
 .../protocol/openwire/amq/AMQConsumerTest.java     |   2 +-
 .../openwire/amq/JMSPullConsumerTest.java          |  24 ++--
 .../openwire/amq/RedeliveryPolicyTest.java         |   2 +-
 .../openwire/amq/SimpleConsumerLargeCountTest.java | 140 +++++++++++++++++++++
 .../openwire/amq/ValidateAddressSizeTest.java      |   2 +-
 9 files changed, 358 insertions(+), 34 deletions(-)

diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
index 11d282075b..a0869a3bba 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
@@ -33,6 +33,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
    public static final int STATE_NOT_RUNNING = 0;
    public static final int STATE_RUNNING = 1;
    public static final int STATE_FORCED_SHUTDOWN = 2;
+   public static final int STATE_PAUSED = 3;
 
    protected final Queue<T> tasks = new ConcurrentLinkedQueue<>();
 
@@ -47,7 +48,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
    @SuppressWarnings("unused")
    private volatile int state = STATE_NOT_RUNNING;
    // Request of forced shutdown
-   private volatile boolean requestedForcedShutdown = false;
+   private volatile boolean pausedProcessing = false;
    // Request of educated shutdown:
    protected volatile boolean requestedShutdown = false;
    // Request to yield to another thread
@@ -63,8 +64,8 @@ public abstract class ProcessorBase<T> extends HandlerBase {
             try {
                T task;
                //while the queue is not empty we process in order:
-               //if requestedForcedShutdown==true than no new tasks will be 
drained from the tasks q.
-               while (!yielded && !requestedForcedShutdown && (task = 
tasks.poll()) != null) {
+               //if pausedProcessing==true than no new tasks will be drained 
from the tasks q.
+               while (!yielded && !pausedProcessing && (task = tasks.poll()) 
!= null) {
                   doTask(task);
                }
             } finally {
@@ -82,7 +83,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
          //but poll() has returned null, so a submitting thread will believe 
that it does not need re-execute.
          //this check fixes the issue
       }
-      while (!tasks.isEmpty() && !requestedShutdown && !yielded);
+      while (!tasks.isEmpty() && !requestedShutdown && !yielded && 
!pausedProcessing);
 
       if (yielded) {
          yielded = false;
@@ -90,6 +91,18 @@ public abstract class ProcessorBase<T> extends HandlerBase {
       }
    }
 
+   public void pauseProcessing() {
+      pausedProcessing = true;
+      stateUpdater.set(this, STATE_PAUSED);
+   }
+
+   public void resumeProcessing() {
+      pausedProcessing = false;
+      if (stateUpdater.compareAndSet(this, STATE_PAUSED, STATE_NOT_RUNNING)) {
+         delegate.execute(task);
+      }
+   }
+
    /**
     * It will shutdown and wait 30 seconds for timeout.
     */
@@ -121,7 +134,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
     */
    public int shutdownNow(Consumer<? super T> onPendingItem, int timeout, 
TimeUnit unit) {
       //alert anyone that has been requested (at least) an immediate shutdown
-      requestedForcedShutdown = true;
+      pausedProcessing = true;
       requestedShutdown = true;
       yielded = false;
 
diff --git 
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/PauseActorTest.java
 
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/PauseActorTest.java
new file mode 100644
index 0000000000..a2d8631179
--- /dev/null
+++ 
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/PauseActorTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.actors;
+
+import java.util.HashSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class PauseActorTest {
+
+   Actor<Integer> actor;
+
+   HashSet<Integer> receivedValues = new HashSet<>();
+   CountDownLatch paused = new CountDownLatch(1);
+   CountDownLatch done = new CountDownLatch(1);
+
+   int firstPauseElements = 50;
+   int firstSend = 80;
+   int totalElements = 100;
+
+   public void doInteger(Integer received) {
+      receivedValues.add(received);
+      if (received.equals(Integer.valueOf(firstPauseElements - 1))) {
+         actor.pauseProcessing();
+         paused.countDown();
+      } else if (received.equals(Integer.valueOf(totalElements - 1))) {
+         done.countDown();
+      }
+   }
+
+   @Test
+   public void testPauseActor() throws Exception {
+      final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+      try {
+         actor = new Actor<>(executorService, this::doInteger);
+
+         for (int i = 0; i < firstSend; i++) {
+            actor.act(i);
+         }
+
+         actor.flush();
+         assertTrue(paused.await(10, TimeUnit.SECONDS));
+
+         Wait.assertEquals(firstPauseElements, () -> receivedValues.size());
+
+         for (int i = firstSend; i < totalElements; i++) {
+            actor.act(i);
+         }
+
+         actor.flush();
+         // it should be unchanged even after adding more elements as the 
actor is paused
+         Wait.assertEquals(firstPauseElements, () -> receivedValues.size());
+
+         for (int i = 0; i < totalElements; i++) {
+            if (i < firstPauseElements) {
+               assertTrue(receivedValues.contains(i));
+            } else {
+               assertFalse(receivedValues.contains(i));
+            }
+         }
+
+         actor.resumeProcessing();
+         actor.flush();
+         assertTrue(done.await(10, TimeUnit.SECONDS));
+         Wait.assertEquals(totalElements, () -> receivedValues.size());
+
+         for (int i = 0; i < totalElements; i++) {
+            assertTrue(receivedValues.contains(i));
+         }
+      } finally {
+         executorService.shutdownNow();
+      }
+   }
+}
\ No newline at end of file
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index ba493f9295..cf405e4b19 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -208,6 +208,14 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
 
    private String validatedUser = null;
 
+   public void block() {
+      openWireActor.pauseProcessing();
+   }
+
+   public void unblock() {
+      openWireActor.resumeProcessing();
+   }
+
    public OpenWireConnection(Connection connection,
                              ActiveMQServer server,
                              OpenWireProtocolManager openWireProtocolManager,
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 4ac10382b5..ea8c4ecf04 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -38,6 +38,8 @@ import 
org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConstants;
 import 
org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -48,6 +50,7 @@ import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.Transaction;
+import 
org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
 import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.utils.CompositeAddress;
 import org.apache.activemq.artemis.utils.SelectorTranslator;
@@ -263,9 +266,10 @@ public class AMQConsumer {
       return info.getConsumerId();
    }
 
-   public void acquireCredit(int n, boolean delivered) {
+   public void replenishCredit(int n, boolean delivered) {
       if (messagePullHandler.get() != null) {
-         //don't acquire any credits when the pull handler controls it!!
+         logger.debug("replenishCredit of {} ignored, as there's no 
messagePullHandler", n);
+         //don't replenish any credits when the pull handler controls it!!
          return;
       }
 
@@ -274,6 +278,7 @@ public class AMQConsumer {
       } else if (deliveredAcksCreditExtension > 0) {
          if (deliveredAcksCreditExtension < n) {
             n -= deliveredAcksCreditExtension;
+            logger.debug("Requested more credits than the limit, maximizing 
the request as {}", n);
             deliveredAcksCreditExtension = 0;
          } else {
             deliveredAcksCreditExtension -= n;
@@ -282,6 +287,9 @@ public class AMQConsumer {
       }
 
       int oldwindow = currentWindow.getAndAdd(n);
+      if (logger.isDebugEnabled()) {
+         logger.debug("replenished {} credits, current after replenish = {}, 
old ={}", n, currentWindow.get(), oldwindow);
+      }
 
       boolean promptDelivery = oldwindow < prefetchSize;
 
@@ -307,7 +315,10 @@ public class AMQConsumer {
          reference.setProtocolData(MessageId.class, 
dispatch.getMessage().getMessageId());
          session.deliverMessage(dispatch);
          // Prevent races with other updates that can lead to credit going 
negative and starving consumers.
-         currentWindow.updateAndGet(i -> i > 0 ? i - 1 : i);
+         currentWindow.updateAndGet(AMQConsumer::decrementWindow);
+         if (logger.isDebugEnabled()) {
+            logger.debug("Decremented credit, current={}", 
currentWindow.get());
+         }
          return size;
       } catch (Throwable t) {
          logger.warn("Error during message dispatch", t);
@@ -315,6 +326,17 @@ public class AMQConsumer {
       }
    }
 
+   static int decrementWindow(int value) {
+      if (value > 0) {
+         return value - 1;
+      } else {
+         if (logger.isDebugEnabled()) {
+            logger.debug("preventing the credit window from going negative, 
keeping value at {}", value, new Exception("trace"));
+         }
+         return value;
+      }
+   }
+
    public void handleDeliverNullDispatch() {
       MessageDispatch md = new MessageDispatch();
       md.setConsumerId(getId());
@@ -334,9 +356,15 @@ public class AMQConsumer {
          return;
       }
 
+      logger.debug("Acking {}", ack);
+
       final int ackMessageCount = ack.getMessageCount();
+
+      // this is for deliveredACK on OpenWire. Meaning the message is still 
being held by the client.
+      // it's just to request more credits
       if (ack.isDeliveredAck()) {
-         acquireCredit(ackMessageCount, true);
+         logger.debug("replinishCredit credit on {} credits, for 
DELIVERED_ACK_TYPE", ackMessageCount);
+         replenishCredit(ackMessageCount, true);
          // our work is done
          return;
       }
@@ -350,12 +378,13 @@ public class AMQConsumer {
 
       if (!ackList.isEmpty() || !removeReferences || 
serverConsumer.getQueue().isTemporary()) {
 
-         // valid match in delivered or browsing or temp - deal with credit
-         acquireCredit(ackMessageCount, false);
-
          if (ack.isExpiredAck()) {
             for (MessageReference ref : ackList) {
                ref.getQueue().expire(ref, serverConsumer, true);
+               if (logger.isDebugEnabled()) {
+                  logger.debug("Acquiring 1 credit for EXPIRED_ACK_TYPE");
+               }
+               replenishCredit(1, false);
             }
          } else if (removeReferences) {
 
@@ -367,6 +396,14 @@ public class AMQConsumer {
             } else {
                transaction = originalTX;
             }
+            transaction.addOperation(new TransactionOperationAbstract() {
+               @Override
+               public void afterCommit(Transaction tx) {
+                  // these credits will only be requested if the commit 
actually succeeds
+                  logger.debug("replenishing {} credits after committed", 
ackMessageCount);
+                  replenishCredit(ackMessageCount, false);
+               }
+            });
 
             if (ack.isIndividualAck() || ack.isStandardAck()) {
                for (MessageReference ref : ackList) {
@@ -390,6 +427,9 @@ public class AMQConsumer {
             if (originalTX == null) {
                transaction.commit(true);
             }
+         } else {
+            logger.debug("Replenishing {} credits without any transaction 
(browsing perhaps)", ackMessageCount);
+            replenishCredit(ackMessageCount, false);
          }
       }
    }
@@ -413,6 +453,9 @@ public class AMQConsumer {
 
    public void processMessagePull(MessagePull messagePull) throws Exception {
       currentWindow.incrementAndGet();
+      if (logger.isDebugEnabled()) {
+         logger.debug("Incremented credit for processMessagePull, current = 
{}", currentWindow.get());
+      }
       MessagePullHandler pullHandler = messagePullHandler.get();
       if (pullHandler != null) {
          pullHandler.nextSequence(messagePullSequence++, 
messagePull.getTimeout());
@@ -424,10 +467,22 @@ public class AMQConsumer {
       if (delayedDispatchPrompter != null) {
          delayedDispatchPrompter.cancel(false);
       }
-      if (info.getPrefetchSize() > 1) {
-         // because response required is false on a RemoveConsumerCommand, a 
new consumer could miss canceled prefetched messages
-         // we await the operation context completion before handling a 
subsequent command
-         session.getCoreSession().getSessionContext().waitCompletion();
+      OpenWireConnection openWireConnection = session.getConnection();
+      if (openWireConnection != null) {
+         openWireConnection.block();
+         session.getCoreSession().getSessionContext().executeOnCompletion(new 
IOCallback() {
+            @Override
+            public void done() {
+               session.getConnection().unblock();
+            }
+
+            @Override
+            public void onError(int errorCode, String errorMessage) {
+               // because response required is false on a 
RemoveConsumerCommand, a new consumer could miss canceled prefetched messages
+               // we await the operation context completion before handling a 
subsequent command
+               session.getConnection().unblock();
+            }
+         });
       }
    }
 
@@ -438,6 +493,9 @@ public class AMQConsumer {
    public void setPrefetchSize(int prefetchSize) {
       this.prefetchSize = prefetchSize;
       this.currentWindow.set(prefetchSize);
+      if (logger.isTraceEnabled()) {
+         logger.trace("Updating credits with a new prefetchSize", 
prefetchSize, new Exception("trace"));
+      }
       this.info.setPrefetchSize(prefetchSize);
       if (this.prefetchSize == 0) {
          messagePullHandler.compareAndSet(null, new MessagePullHandler());
@@ -493,7 +551,10 @@ public class AMQConsumer {
             if (next >= 0) {
                if (timeout <= 0) {
                   // Prevent races with other updates that can lead to credit 
going negative and starving consumers.
-                  currentWindow.updateAndGet(i -> i > 0 ? i - 1 : i);
+                  currentWindow.updateAndGet(AMQConsumer::decrementWindow);
+                  if (logger.isDebugEnabled()) {
+                     logger.debug("UpdateCredit down on FORCED_DELIVERY, 
current after decrement={}", currentWindow.get());
+                  }
                   latch.countDown();
                } else {
                   messagePullFuture = scheduledPool.schedule(() -> {
@@ -502,7 +563,10 @@ public class AMQConsumer {
                         // can race with an actual message arriving so we must 
ensure we don't reduce
                         // credit below zero as we want credit to always be 
zero or on active pull it
                         // should be one (greater than one indicates a broken 
client implementation).
-                        currentWindow.updateAndGet(i -> i > 0 ? i - 1 : i);
+                        
currentWindow.updateAndGet(AMQConsumer::decrementWindow);
+                        if (logger.isDebugEnabled()) {
+                           logger.debug("UpdateCredit down on FORCED_DELIVERY, 
current after decrement={}", currentWindow.get());
+                        }
                         handleDeliverNullDispatch();
                      }
                   }, timeout, TimeUnit.MILLISECONDS);
@@ -528,7 +592,10 @@ public class AMQConsumer {
    }
 
    public void addRolledback(MessageReference messageReference) {
-      currentWindow.decrementAndGet();
+      // Note:
+      // We used to call currentWindow.decrement here.
+      // this is not needed any more as we won't be taking credits unless a 
commit happened.
+
       getRolledbackMessageRefsOrCreate().add(messageReference);
    }
 
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerTest.java
 
b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerTest.java
index c302f98aff..152dad95a1 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerTest.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerTest.java
@@ -121,7 +121,7 @@ public class AMQConsumerTest {
 
       assertFalse(consumer.hasCredits());
 
-      consumer.acquireCredit(1, true);
+      consumer.replenishCredit(1, true);
 
       assertTrue(consumer.hasCredits());
    }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSPullConsumerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSPullConsumerTest.java
index 318af3233f..279b961315 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSPullConsumerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSPullConsumerTest.java
@@ -61,19 +61,19 @@ public class JMSPullConsumerTest extends BasicOpenWireTest {
       destination = createDestination(session, destinationType);
       ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) 
session.createConsumer(destination);
 
-      assertNull(consumer.receive(100));
-      assertNull(consumer.receive(100));
-      assertNull(consumer.receive(100));
+      assertNull(consumer.receive(10));
+      assertNull(consumer.receive(10));
+      assertNull(consumer.receive(10));
 
       sendMessages(session, destination, 3);
 
-      assertNotNull(consumer.receive(100));
+      assertNotNull(consumer.receive(10));
 
       ActiveMQMessageConsumer consumer2 = (ActiveMQMessageConsumer) 
session.createConsumer(destination);
       ActiveMQMessageConsumer consumer3 = (ActiveMQMessageConsumer) 
session.createConsumer(destination);
 
-      assertNotNull(consumer2.receive(100));
-      assertNotNull(consumer3.receive(100));
+      assertNotNull(consumer2.receive(10));
+      assertNotNull(consumer3.receive(10));
    }
 
    @TestTemplate
@@ -86,17 +86,17 @@ public class JMSPullConsumerTest extends BasicOpenWireTest {
       destination = createDestination(session, destinationType);
       ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) 
session.createConsumer(destination);
 
-      assertNull(consumer.receive(100));
-      assertNull(consumer.receive(100));
-      assertNull(consumer.receive(100));
+      assertNull(consumer.receive(10));
+      assertNull(consumer.receive(10));
+      assertNull(consumer.receive(10));
 
       sendMessages(session, destination, 3);
 
       ActiveMQMessageConsumer consumer2 = (ActiveMQMessageConsumer) 
session.createConsumer(destination);
 
-      assertNotNull(consumer2.receive(100));
-      assertNotNull(consumer2.receive(100));
-      assertNotNull(consumer2.receive(100));
+      assertNotNull(consumer2.receive(1000));
+      assertNotNull(consumer2.receive(1000));
+      assertNotNull(consumer2.receive(1000));
 
       assertNull(consumer.receiveNoWait());
    }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
index 951f66960c..02e146f912 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
@@ -474,7 +474,7 @@ public class RedeliveryPolicyTest extends BasicOpenWireTest 
{
          assertNotNull(m, "null@:" + i);
          session.commit();
 
-         assertTrue(queueControl.getDeliveringCount() <= prefetchSize + 1, 
"deliveryCount: " + queueControl.getDeliveringCount() + " @:" + i);
+         assertTrue(queueControl.getDeliveringCount() <= prefetchSize, 
"deliveringCount: " + queueControl.getDeliveringCount() + " @:" + i);
       }
    }
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/SimpleConsumerLargeCountTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/SimpleConsumerLargeCountTest.java
new file mode 100644
index 0000000000..f88b6694b0
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/SimpleConsumerLargeCountTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire.amq;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import 
org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * This test is to make sure a consumer on a transaction session, would 
receive all messages even
+ * when the number of messages is bigger than prefetch.
+ * <p>
+ * Basically I am making sure flow control will still issue credits even 
thought the messages are not removed with a (ACK mode = remove)
+ */
+public class SimpleConsumerLargeCountTest extends BasicOpenWireTest {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   @Test
+   public void testTopicReceiverFlowControlled() throws Exception {
+      Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+      ActiveMQTopic destination = (ActiveMQTopic) 
this.createDestination(session, ActiveMQDestination.TOPIC_TYPE);
+      connection.start();
+
+      // trying to starve the server if there's anything blocking on a create 
resources
+      int numberOfSubscriptions = 50;
+      // more messages than prefetch
+      int numberOfMessages = 500;
+      int rollbackAt = 50;
+
+      ExecutorService service = 
Executors.newFixedThreadPool(numberOfSubscriptions);
+      runAfter(service::shutdownNow);
+      AtomicInteger errors = new AtomicInteger(0);
+
+      CountDownLatch done = new CountDownLatch(numberOfSubscriptions);
+
+      CyclicBarrier startFlag = new CyclicBarrier(numberOfSubscriptions + 1);
+      for (int dest = 0; dest < numberOfSubscriptions; dest++) {
+         final int finalDest = dest;
+         service.execute(() -> {
+            Connection connConsumer = null;
+            try {
+               connConsumer = factory.createConnection();
+               connConsumer.setClientID("client" + finalDest);
+               Session sessConsumer = connConsumer.createSession(true, 
Session.SESSION_TRANSACTED);
+               MessageConsumer consumer = 
sessConsumer.createDurableSubscriber(destination, "cons" + finalDest);
+               connConsumer.start();
+               startFlag.await(10, TimeUnit.SECONDS);
+
+               for (int rollbackNumber = 0; rollbackNumber < 2; 
rollbackNumber++) {
+                  for (int i = 0; i < rollbackAt; i++) {
+                     TextMessage message = (TextMessage) 
consumer.receive(5000);
+                     logger.debug("Received {}, dest={}", i, finalDest);
+                     assertNotNull(message);
+                     assertEquals(i, message.getIntProperty("i"));
+                  }
+                  // this is just to make the test more challenging. rollback 
destinations, make sure they won't block
+                  // and flow control would stlil be okay on sending more 
messages
+                  sessConsumer.rollback();
+               }
+
+               consumer.close();
+               consumer = sessConsumer.createDurableSubscriber(destination, 
"cons" + finalDest);
+
+
+               for (int i = 0; i < numberOfMessages; i++) {
+                  TextMessage message = (TextMessage) consumer.receive(5000);
+                  logger.debug("Received {}, dest={}", i, finalDest);
+                  assertNotNull(message);
+                  assertEquals(i, message.getIntProperty("i"));
+               }
+               sessConsumer.commit();
+               assertNull(consumer.receiveNoWait());
+
+            } catch (Throwable e) {
+               logger.warn(e.getMessage(), e);
+               errors.incrementAndGet();
+            } finally {
+               if (connConsumer != null) {
+                  try {
+                     connConsumer.close();
+                  } catch (Throwable ignored) {
+                  }
+               }
+               done.countDown();
+            }
+         });
+      }
+
+      startFlag.await(10, TimeUnit.SECONDS);
+
+      MessageProducer producer = session.createProducer(destination);
+      for (int i = 0; i < numberOfMessages; i++) {
+         TextMessage message = session.createTextMessage("Message: " + i);
+         message.setIntProperty("i", i);
+         producer.send(message);
+      }
+      session.commit();
+
+      assertTrue(done.await(1, TimeUnit.MINUTES));
+      assertEquals(0, errors.get());
+      service.shutdownNow();
+      assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
+   }
+}
\ No newline at end of file
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ValidateAddressSizeTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ValidateAddressSizeTest.java
index 6e1b096f00..a4ca962d76 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ValidateAddressSizeTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ValidateAddressSizeTest.java
@@ -103,7 +103,7 @@ public class ValidateAddressSizeTest extends 
BasicOpenWireTest {
       String messageBody = "a".repeat(messageBodySize);
       String largeMessageBody = "a".repeat(largeMessageSize);
 
-      int consumers = 10;
+      int consumers = 200;
       int numberOfMessages = 10;
       int numberOfLargeMessages = 5;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to