This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 2f6345d490c [fix][broker] Restore the behavior to dispatch batch messages according to consumer permits (#24092) 2f6345d490c is described below commit 2f6345d490c374714a8418e9664e4fe3a9d97afc Author: Yunze Xu <xyzinfern...@163.com> AuthorDate: Wed Mar 19 15:58:06 2025 +0800 [fix][broker] Restore the behavior to dispatch batch messages according to consumer permits (#24092) --- .../service/persistent/PersistentDispatcherMultipleConsumers.java | 6 +++--- .../java/org/apache/pulsar/broker/service/BatchMessageTest.java | 4 ++++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 6f3fe19f0a1..2af04044aae 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -843,14 +843,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractPersistentDis c, c.getAvailablePermits()); } - int maxMessagesInThisBatch = - Math.max(remainingMessages, serviceConfig.getDispatcherMaxRoundRobinBatchSize()); + int maxMessagesInThisBatch = Math.min(remainingMessages, availablePermits); if (c.getMaxUnackedMessages() > 0) { // Calculate the maximum number of additional unacked messages allowed int maxAdditionalUnackedMessages = Math.max(c.getMaxUnackedMessages() - c.getUnackedMessages(), 0); maxMessagesInThisBatch = Math.min(maxMessagesInThisBatch, maxAdditionalUnackedMessages); } - int maxEntriesInThisBatch = Math.min(availablePermits, + // TODO: add tests to verify dispatcherMaxRoundRobinBatchSize is respected + int maxEntriesInThisBatch = Math.min(serviceConfig.getDispatcherMaxRoundRobinBatchSize(), // use the average batch size per message to calculate the number of entries to // dispatch. round up to the next integer without using floating point arithmetic. (maxMessagesInThisBatch + avgBatchSizePerMsg - 1) / avgBatchSizePerMsg); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java index e5f9e43b8bb..b821c0cc663 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java @@ -1012,6 +1012,10 @@ public class BatchMessageTest extends BrokerTestBase { } FutureUtil.waitForAll(sendFutureList).get(); + Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> { + assertTrue(consumer1.numMessagesInQueue() > 0); + assertTrue(consumer2.numMessagesInQueue() > 0); + }); assertEquals(consumer1.numMessagesInQueue(), batchMessages, batchMessages); assertEquals(consumer2.numMessagesInQueue(), batchMessages, batchMessages);