This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f6345d490c [fix][broker] Restore the behavior to dispatch batch 
messages according to consumer permits (#24092)
2f6345d490c is described below

commit 2f6345d490c374714a8418e9664e4fe3a9d97afc
Author: Yunze Xu <xyzinfern...@163.com>
AuthorDate: Wed Mar 19 15:58:06 2025 +0800

    [fix][broker] Restore the behavior to dispatch batch messages according to 
consumer permits (#24092)
---
 .../service/persistent/PersistentDispatcherMultipleConsumers.java   | 6 +++---
 .../java/org/apache/pulsar/broker/service/BatchMessageTest.java     | 4 ++++
 2 files changed, 7 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 6f3fe19f0a1..2af04044aae 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -843,14 +843,14 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractPersistentDis
                         c, c.getAvailablePermits());
             }
 
-            int maxMessagesInThisBatch =
-                    Math.max(remainingMessages, 
serviceConfig.getDispatcherMaxRoundRobinBatchSize());
+            int maxMessagesInThisBatch = Math.min(remainingMessages, 
availablePermits);
             if (c.getMaxUnackedMessages() > 0) {
                 // Calculate the maximum number of additional unacked messages 
allowed
                 int maxAdditionalUnackedMessages = 
Math.max(c.getMaxUnackedMessages() - c.getUnackedMessages(), 0);
                 maxMessagesInThisBatch = Math.min(maxMessagesInThisBatch, 
maxAdditionalUnackedMessages);
             }
-            int maxEntriesInThisBatch = Math.min(availablePermits,
+            // TODO: add tests to verify dispatcherMaxRoundRobinBatchSize is 
respected
+            int maxEntriesInThisBatch = 
Math.min(serviceConfig.getDispatcherMaxRoundRobinBatchSize(),
                             // use the average batch size per message to 
calculate the number of entries to
                             // dispatch. round up to the next integer without 
using floating point arithmetic.
                             (maxMessagesInThisBatch + avgBatchSizePerMsg - 1) 
/ avgBatchSizePerMsg);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
index e5f9e43b8bb..b821c0cc663 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
@@ -1012,6 +1012,10 @@ public class BatchMessageTest extends BrokerTestBase {
         }
         FutureUtil.waitForAll(sendFutureList).get();
 
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertTrue(consumer1.numMessagesInQueue() > 0);
+            assertTrue(consumer2.numMessagesInQueue() > 0);
+        });
         assertEquals(consumer1.numMessagesInQueue(), batchMessages, 
batchMessages);
         assertEquals(consumer2.numMessagesInQueue(), batchMessages, 
batchMessages);
 

Reply via email to