This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-3.0.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a4c6476842c0bb4c035975a43ba1bb9bd14e67c2
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Tue Feb 25 12:57:17 2025 +0800

    [improve] [broker] Make the estimated entry size more accurate (#23931)
    
    (cherry picked from commit 35a16768ff095449e228a8aa1774b26a068e67e9)
    (cherry picked from commit d0e95db50b2362aca47507fec4e98a7f4903fe65)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 58 +++++++++----
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  2 +
 .../mledger/impl/cache/RangeEntryCacheImpl.java    | 12 +--
 .../impl/InflightReadsLimiterIntegrationTest.java  | 15 ++--
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 98 ++++++++++++++++++++--
 .../BatchMessageWithBatchIndexLevelTest.java       | 12 +--
 .../pulsar/broker/stats/ConsumerStatsTest.java     | 19 +++--
 7 files changed, 166 insertions(+), 50 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 0755c284de7..58c95e106ee 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -24,6 +24,7 @@ import static 
org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLed
 import static 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC;
 import static 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES;
 import static 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
+import static 
org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
 import static 
org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
@@ -3686,26 +3687,51 @@ public class ManagedCursorImpl implements ManagedCursor 
{
         if (maxSizeBytes == NO_MAX_SIZE_LIMIT) {
             return maxEntries;
         }
+        int maxEntriesBasedOnSize =
+                Long.valueOf(estimateEntryCountBySize(maxSizeBytes, 
readPosition, ledger)).intValue();
+        return Math.min(maxEntriesBasedOnSize, maxEntries);
+    }
 
-        double avgEntrySize = ledger.getStats().getEntrySizeAverage();
-        if (!Double.isFinite(avgEntrySize)) {
-            // We don't have yet any stats on the topic entries. Let's try to 
use the cursor avg size stats
-            avgEntrySize = (double) entriesReadSize / (double) 
entriesReadCount;
-        }
-
-        if (!Double.isFinite(avgEntrySize)) {
-            // If we still don't have any information, it means this is the 
first time we attempt reading
-            // and there are no writes. Let's start with 1 to avoid any 
overflow and start the avg stats
-            return 1;
+    static long estimateEntryCountBySize(long bytesSize, PositionImpl 
readPosition, ManagedLedgerImpl ml) {
+        Position posToRead = readPosition;
+        if (!ml.isValidPosition(readPosition)) {
+            posToRead = ml.getNextValidPosition(readPosition);
         }
+        long result = 0;
+        long remainingBytesSize = bytesSize;
 
-        int maxEntriesBasedOnSize = (int) (maxSizeBytes / avgEntrySize);
-        if (maxEntriesBasedOnSize < 1) {
-            // We need to read at least one entry
-            return 1;
+        while (remainingBytesSize > 0) {
+            // Last ledger.
+            if (posToRead.getLedgerId() == ml.getCurrentLedger().getId()) {
+                if (ml.getCurrentLedgerSize() == 0 ||  
ml.getCurrentLedgerEntries() == 0) {
+                    // Only read 1 entry if no entries to read.
+                    return 1;
+                }
+                long avg = Math.max(1, ml.getCurrentLedgerSize() / 
ml.getCurrentLedgerEntries())
+                        + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+                result += remainingBytesSize / avg;
+                break;
+            }
+            // Skip empty ledger.
+            LedgerInfo ledgerInfo = 
ml.getLedgersInfo().get(posToRead.getLedgerId());
+            if (ledgerInfo.getSize() == 0 || ledgerInfo.getEntries() == 0) {
+                posToRead = 
ml.getNextValidPosition(PositionImpl.get(posToRead.getLedgerId(), 
Long.MAX_VALUE));
+                continue;
+            }
+            // Calculate entries by average of ledgers.
+            long avg = Math.max(1, ledgerInfo.getSize() / 
ledgerInfo.getEntries()) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+            long remainEntriesOfLedger = ledgerInfo.getEntries() - 
posToRead.getEntryId();
+            if (remainEntriesOfLedger * avg >= remainingBytesSize) {
+                result += remainingBytesSize / avg;
+                break;
+            } else {
+                // Calculate for the next ledger.
+                result += remainEntriesOfLedger;
+                remainingBytesSize -= remainEntriesOfLedger * avg;
+                posToRead = 
ml.getNextValidPosition(PositionImpl.get(posToRead.getLedgerId(), 
Long.MAX_VALUE));
+            }
         }
-
-        return Math.min(maxEntriesBasedOnSize, maxEntries);
+        return Math.max(result, 1);
     }
 
     @Override
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index ef5b3b0d33e..93e824157b0 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -217,6 +217,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     private final CallbackMutex offloadMutex = new CallbackMutex();
     private static final CompletableFuture<PositionImpl> NULL_OFFLOAD_PROMISE 
= CompletableFuture
             .completedFuture(PositionImpl.LATEST);
+    @VisibleForTesting
+    @Getter
     protected volatile LedgerHandle currentLedger;
     protected volatile long currentLedgerEntries = 0;
     protected volatile long currentLedgerSize = 0;
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index c8d14cebebc..02718561705 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -302,7 +302,7 @@ public class RangeEntryCacheImpl implements EntryCache {
             doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition, 
numberOfEntries, shouldCacheEntry,
                     originalCallback, ctx);
         } else {
-            long estimatedEntrySize = getEstimatedEntrySize();
+            long estimatedEntrySize = getEstimatedEntrySize(lh);
             long estimatedReadSize = numberOfEntries * estimatedEntrySize;
             if (log.isDebugEnabled()) {
                 log.debug("Estimated read size: {} bytes for {} entries with 
{} estimated entry size",
@@ -418,12 +418,12 @@ public class RangeEntryCacheImpl implements EntryCache {
     }
 
     @VisibleForTesting
-    public long getEstimatedEntrySize() {
-        long estimatedEntrySize = getAvgEntrySize();
-        if (estimatedEntrySize == 0) {
-            estimatedEntrySize = DEFAULT_ESTIMATED_ENTRY_SIZE;
+    public long getEstimatedEntrySize(ReadHandle lh) {
+        if (lh.getLength() == 0 || lh.getLastAddConfirmed() < 0) {
+            // No entries stored.
+            return Math.max(getAvgEntrySize(), DEFAULT_ESTIMATED_ENTRY_SIZE) + 
BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
         }
-        return estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+        return Math.max(1, lh.getLength() / (lh.getLastAddConfirmed() + 1)) + 
BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
     }
 
     private long getAvgEntrySize() {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java
index 48f0cf08ddf..6676baf8b55 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java
@@ -141,10 +141,9 @@ public class InflightReadsLimiterIntegrationTest extends 
MockedBookKeeperTestCas
         SimpleReadEntriesCallback cb0 = new SimpleReadEntriesCallback();
         entryCache.asyncReadEntry(spyCurrentLedger, 125, 125, true, cb0, ctx);
         cb0.entries.join();
-        Long sizePerEntry1 = entryCache.getEstimatedEntrySize();
-        Assert.assertEquals(sizePerEntry1, 1 + 
RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
+        int sizePerEntry = 
Long.valueOf(entryCache.getEstimatedEntrySize(ml.currentLedger)).intValue();
         Awaitility.await().untilAsserted(() -> {
-            long remainingBytes =limiter.getRemainingBytes();
+            long remainingBytes = limiter.getRemainingBytes();
             Assert.assertEquals(remainingBytes, totalCapacity);
         });
         log.info("remainingBytes 0: {}", limiter.getRemainingBytes());
@@ -165,7 +164,7 @@ public class InflightReadsLimiterIntegrationTest extends 
MockedBookKeeperTestCas
             entryCache.asyncReadEntry(spyCurrentLedger, start2, end2, true, 
cb2, ctx);
         }).start();
 
-        long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1 
+ readCount2, 1);
+        long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1 
+ readCount2, sizePerEntry);
         long remainingBytesExpected1 = totalCapacity - bytesAcquired1;
         log.info("acquired : {}", bytesAcquired1);
         log.info("remainingBytesExpected 0 : {}", remainingBytesExpected1);
@@ -178,9 +177,7 @@ public class InflightReadsLimiterIntegrationTest extends 
MockedBookKeeperTestCas
         Thread.sleep(3000);
         readCompleteSignal1.countDown();
         cb1.entries.join();
-        Long sizePerEntry2 = entryCache.getEstimatedEntrySize();
-        Assert.assertEquals(sizePerEntry2, 1 + 
RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
-        long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2, 
1);
+        long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2, 
sizePerEntry);
         long remainingBytesExpected2 = totalCapacity - bytesAcquired2;
         log.info("acquired : {}", bytesAcquired2);
         log.info("remainingBytesExpected 1: {}", remainingBytesExpected2);
@@ -191,8 +188,6 @@ public class InflightReadsLimiterIntegrationTest extends 
MockedBookKeeperTestCas
 
         readCompleteSignal2.countDown();
         cb2.entries.join();
-        Long sizePerEntry3 = entryCache.getEstimatedEntrySize();
-        Assert.assertEquals(sizePerEntry3, 1 + 
RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
         Awaitility.await().untilAsserted(() -> {
             long remainingBytes = limiter.getRemainingBytes();
             log.info("remainingBytes 2: {}", remainingBytes);
@@ -204,7 +199,7 @@ public class InflightReadsLimiterIntegrationTest extends 
MockedBookKeeperTestCas
     }
 
     private long calculateBytesSizeBeforeFirstReading(int entriesCount, int 
perEntrySize) {
-        return entriesCount * (perEntrySize + 
RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
+        return entriesCount * perEntrySize;
     }
 
     class SimpleReadEntriesCallback implements 
AsyncCallbacks.ReadEntriesCallback {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index fe484d62c4e..62c1539648b 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import static 
org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
@@ -681,13 +682,15 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         ManagedCursor cursor = ledger.openCursor("c1");
 
         for (int i = 0; i < 100; i++) {
-            ledger.addEntry(new byte[1024]);
+            ledger.addEntry(new byte[(int) (1024)]);
         }
 
-        // First time, since we don't have info, we'll get 1 single entry
-        readAndCheck(cursor, 10, 3 * 1024, 1);
+        // Since https://github.com/apache/pulsar/pull/23931 improved the 
performance of delivery, the consumer
+        // will get more messages than before(it only receives 1 messages at 
the first delivery),
+        int avg = (int) (BOOKKEEPER_READ_OVERHEAD_PER_ENTRY + 1024);
+        readAndCheck(cursor, 10, 3 * avg, 3);
         // We should only return 3 entries, based on the max size
-        readAndCheck(cursor, 20, 3 * 1024, 3);
+        readAndCheck(cursor, 20, 3 * avg, 3);
         // If maxSize is < avg, we should get 1 entry
         readAndCheck(cursor, 10, 500, 1);
     }
@@ -3885,13 +3888,15 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
             ledger.addEntry(new byte[1024]);
         }
 
-        // First time, since we don't have info, we'll get 1 single entry
-        List<Entry> entries = c.readEntriesOrWait(10, 3 * 1024);
-        assertEquals(entries.size(), 1);
+        // Since https://github.com/apache/pulsar/pull/23931 improved the 
performance of delivery, the consumer
+        // will get more messages than before(it only receives 1 messages at 
the first delivery),
+        int avg = (int) (1024 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
+        List<Entry> entries = c.readEntriesOrWait(10, 3 * avg);
+        assertEquals(entries.size(), 3);
         entries.forEach(Entry::release);
 
         // We should only return 3 entries, based on the max size
-        entries = c.readEntriesOrWait(10, 3 * 1024);
+        entries = c.readEntriesOrWait(10, 3 * avg);
         assertEquals(entries.size(), 3);
         entries.forEach(Entry::release);
 
@@ -4798,5 +4803,82 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         assertEquals(cursor.getReadPosition(), markDeletedPosition.getNext());
     }
 
+    @Test
+    public void testEstimateEntryCountBySize() throws Exception {
+        final String mlName = "ml-" + 
UUID.randomUUID().toString().replaceAll("-", "");
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName);
+        long entryCount0 =
+                ManagedCursorImpl.estimateEntryCountBySize(16, 
PositionImpl.get(ml.getCurrentLedger().getId(), 0), ml);
+        assertEquals(entryCount0, 1);
+        // Avoid trimming ledgers.
+        ml.openCursor("c1");
+
+        // Build data.
+        for (int i = 0; i < 100; i++) {
+            ml.addEntry(new byte[]{1});
+        }
+        long ledger1 = ml.getCurrentLedger().getId();
+        ml.getCurrentLedger().close();
+        ml.ledgerClosed(ml.getCurrentLedger());
+        for (int i = 0; i < 100; i++) {
+            ml.addEntry(new byte[]{1, 2});
+        }
+        long ledger2 = ml.getCurrentLedger().getId();
+        ml.getCurrentLedger().close();
+        ml.ledgerClosed(ml.getCurrentLedger());
+        for (int i = 0; i < 100; i++) {
+            ml.addEntry(new byte[]{1, 2, 3, 4});
+        }
+        long ledger3 = ml.getCurrentLedger().getId();
+        MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo1 = 
ml.getLedgersInfo().get(ledger1);
+        MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo2 = 
ml.getLedgersInfo().get(ledger2);
+        long average1 = ledgerInfo1.getSize() / ledgerInfo1.getEntries() + 
BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+        long average2 = ledgerInfo2.getSize() / ledgerInfo2.getEntries() + 
BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+        long average3 = ml.getCurrentLedgerSize() / 
ml.getCurrentLedgerEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+        assertEquals(average1, 1 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
+        assertEquals(average2, 2 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
+        assertEquals(average3, 4 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
+
+        // Test: the individual ledgers.
+        long entryCount1 =
+                ManagedCursorImpl.estimateEntryCountBySize(average1 * 16, 
PositionImpl.get(ledger1, 0), ml);
+        assertEquals(entryCount1, 16);
+        long entryCount2 =
+                ManagedCursorImpl.estimateEntryCountBySize(average2 * 8, 
PositionImpl.get(ledger2, 0), ml);
+        assertEquals(entryCount2, 8);
+        long entryCount3 =
+                ManagedCursorImpl.estimateEntryCountBySize(average3 * 4, 
PositionImpl.get(ledger3, 0), ml);
+        assertEquals(entryCount3, 4);
+
+        // Test: across ledgers.
+        long entryCount4 =
+                ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + 
(average2 * 8), PositionImpl.get(ledger1, 0), ml);
+        assertEquals(entryCount4, 108);
+        long entryCount5 =
+                ManagedCursorImpl.estimateEntryCountBySize((average2 * 100) + 
(average3 * 4), PositionImpl.get(ledger2, 0), ml);
+        assertEquals(entryCount5, 104);
+        long entryCount6 =
+                ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + 
(average2 * 100) + (average3 * 4), PositionImpl.get(ledger1, 0), ml);
+        assertEquals(entryCount6, 204);
+
+        long entryCount7 =
+                ManagedCursorImpl.estimateEntryCountBySize((average1 * 20) + 
(average2 * 8), PositionImpl.get(ledger1, 80), ml);
+        assertEquals(entryCount7, 28);
+        long entryCount8 =
+                ManagedCursorImpl.estimateEntryCountBySize((average2 * 20) + 
(average3 * 4), PositionImpl.get(ledger2, 80), ml);
+        assertEquals(entryCount8, 24);
+        long entryCount9 =
+                ManagedCursorImpl.estimateEntryCountBySize((average1 * 20) + 
(average2 * 100)  + (average3 * 4), PositionImpl.get(ledger1, 80), ml);
+        assertEquals(entryCount9, 124);
+
+        // Test: read more than entries written.
+        long entryCount10 =
+                ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + 
(average2 * 100) + (average3 * 100) + (average3 * 4) , 
PositionImpl.get(ledger1, 0), ml);
+        assertEquals(entryCount10, 304);
+
+        // cleanup.
+        ml.delete();
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ManagedCursorTest.class);
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
index 52147f74f4a..21a7c179b82 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
@@ -85,7 +85,7 @@ public class BatchMessageWithBatchIndexLevelTest extends 
BatchMessageTest {
                 .newConsumer()
                 .topic(topicName)
                 .subscriptionName(subscriptionName)
-                .receiverQueueSize(10)
+                .receiverQueueSize(50)
                 .subscriptionType(SubscriptionType.Shared)
                 .enableBatchIndexAcknowledgment(true)
                 .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
@@ -114,27 +114,29 @@ public class BatchMessageWithBatchIndexLevelTest extends 
BatchMessageTest {
         consumer.acknowledge(receive1);
         consumer.acknowledge(receive2);
         Awaitility.await().untilAsserted(() -> {
-            
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 18);
+            // Since https://github.com/apache/pulsar/pull/23931 improved the 
mechanism of estimate average entry size,
+            // broker will deliver much messages than before. So edit 18 -> 38 
here.
+            
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 38);
         });
         Message<byte[]> receive3 = consumer.receive();
         Message<byte[]> receive4 = consumer.receive();
         consumer.acknowledge(receive3);
         consumer.acknowledge(receive4);
         Awaitility.await().untilAsserted(() -> {
-            
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16);
+            
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 36);
         });
         // Block cmd-flow send until verify finish. see: 
https://github.com/apache/pulsar/pull/17436.
         consumer.pause();
         Message<byte[]> receive5 = consumer.receive();
         consumer.negativeAcknowledge(receive5);
         Awaitility.await().pollInterval(1, 
TimeUnit.MILLISECONDS).untilAsserted(() -> {
-            
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 0);
+            
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 20);
         });
         // Unblock cmd-flow.
         consumer.resume();
         consumer.receive();
         Awaitility.await().untilAsserted(() -> {
-            
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16);
+            
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 36);
         });
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index 5aeed40107d..423c4daa291 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -388,8 +388,13 @@ public class ConsumerStatsTest extends 
ProducerConsumerBase {
                 .batchingMaxPublishDelay(5, TimeUnit.SECONDS)
                 .batchingMaxBytes(Integer.MAX_VALUE)
                 .create();
-
-        producer.send("first-message");
+        // The first messages deliver: 20 msgs.
+        // Average of "messages per batch" is "1".
+        for (int i = 0; i < 20; i++) {
+            producer.send("first-message");
+        }
+        // The second messages deliver: 20 msgs.
+        // Average of "messages per batch" is "Math.round(1 * 0.9 + 20 * 0.1) 
= 2.9 ~ 3".
         List<CompletableFuture<MessageId>> futures = new ArrayList<>();
         for (int i = 0; i < 20; i++) {
             futures.add(producer.sendAsync("message"));
@@ -423,6 +428,7 @@ public class ConsumerStatsTest extends ProducerConsumerBase 
{
         metadataConsumer.put("matchValueReschedule", "producer2");
         @Cleanup
         Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topic).properties(metadataConsumer)
+                .receiverQueueSize(20)
                 
.subscriptionName(subName).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
 
         int counter = 0;
@@ -437,14 +443,17 @@ public class ConsumerStatsTest extends 
ProducerConsumerBase {
             }
         }
 
-        assertEquals(21, counter);
+        assertEquals(40, counter);
 
         ConsumerStats consumerStats =
                 
admin.topics().getStats(topic).getSubscriptions().get(subName).getConsumers().get(0);
 
-        assertEquals(21, consumerStats.getMsgOutCounter());
+        assertEquals(40, consumerStats.getMsgOutCounter());
 
-        // Math.round(1 * 0.9 + 0.1 * (20 / 1))
+        // The first messages deliver: 20 msgs.
+        // Average of "messages per batch" is "1".
+        // The second messages deliver: 20 msgs.
+        // Average of "messages per batch" is "Math.round(1 * 0.9 + 20 * 0.1) 
= 2.9 ~ 3".
         int avgMessagesPerEntry = consumerStats.getAvgMessagesPerEntry();
         assertEquals(3, avgMessagesPerEntry);
     }

Reply via email to