This is an automated email from the ASF dual-hosted git repository. baodi pushed a commit to branch branch-3.0.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a4c6476842c0bb4c035975a43ba1bb9bd14e67c2 Author: fengyubiao <yubiao.f...@streamnative.io> AuthorDate: Tue Feb 25 12:57:17 2025 +0800 [improve] [broker] Make the estimated entry size more accurate (#23931) (cherry picked from commit 35a16768ff095449e228a8aa1774b26a068e67e9) (cherry picked from commit d0e95db50b2362aca47507fec4e98a7f4903fe65) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 58 +++++++++---- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 + .../mledger/impl/cache/RangeEntryCacheImpl.java | 12 +-- .../impl/InflightReadsLimiterIntegrationTest.java | 15 ++-- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 98 ++++++++++++++++++++-- .../BatchMessageWithBatchIndexLevelTest.java | 12 +-- .../pulsar/broker/stats/ConsumerStatsTest.java | 19 +++-- 7 files changed, 166 insertions(+), 50 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 0755c284de7..58c95e106ee 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -24,6 +24,7 @@ import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLed import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException; +import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; @@ -3686,26 +3687,51 @@ public class ManagedCursorImpl implements ManagedCursor { if (maxSizeBytes == NO_MAX_SIZE_LIMIT) { return maxEntries; } + int maxEntriesBasedOnSize = + Long.valueOf(estimateEntryCountBySize(maxSizeBytes, readPosition, ledger)).intValue(); + return Math.min(maxEntriesBasedOnSize, maxEntries); + } - double avgEntrySize = ledger.getStats().getEntrySizeAverage(); - if (!Double.isFinite(avgEntrySize)) { - // We don't have yet any stats on the topic entries. Let's try to use the cursor avg size stats - avgEntrySize = (double) entriesReadSize / (double) entriesReadCount; - } - - if (!Double.isFinite(avgEntrySize)) { - // If we still don't have any information, it means this is the first time we attempt reading - // and there are no writes. Let's start with 1 to avoid any overflow and start the avg stats - return 1; + static long estimateEntryCountBySize(long bytesSize, PositionImpl readPosition, ManagedLedgerImpl ml) { + Position posToRead = readPosition; + if (!ml.isValidPosition(readPosition)) { + posToRead = ml.getNextValidPosition(readPosition); } + long result = 0; + long remainingBytesSize = bytesSize; - int maxEntriesBasedOnSize = (int) (maxSizeBytes / avgEntrySize); - if (maxEntriesBasedOnSize < 1) { - // We need to read at least one entry - return 1; + while (remainingBytesSize > 0) { + // Last ledger. + if (posToRead.getLedgerId() == ml.getCurrentLedger().getId()) { + if (ml.getCurrentLedgerSize() == 0 || ml.getCurrentLedgerEntries() == 0) { + // Only read 1 entry if no entries to read. + return 1; + } + long avg = Math.max(1, ml.getCurrentLedgerSize() / ml.getCurrentLedgerEntries()) + + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; + result += remainingBytesSize / avg; + break; + } + // Skip empty ledger. + LedgerInfo ledgerInfo = ml.getLedgersInfo().get(posToRead.getLedgerId()); + if (ledgerInfo.getSize() == 0 || ledgerInfo.getEntries() == 0) { + posToRead = ml.getNextValidPosition(PositionImpl.get(posToRead.getLedgerId(), Long.MAX_VALUE)); + continue; + } + // Calculate entries by average of ledgers. + long avg = Math.max(1, ledgerInfo.getSize() / ledgerInfo.getEntries()) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; + long remainEntriesOfLedger = ledgerInfo.getEntries() - posToRead.getEntryId(); + if (remainEntriesOfLedger * avg >= remainingBytesSize) { + result += remainingBytesSize / avg; + break; + } else { + // Calculate for the next ledger. + result += remainEntriesOfLedger; + remainingBytesSize -= remainEntriesOfLedger * avg; + posToRead = ml.getNextValidPosition(PositionImpl.get(posToRead.getLedgerId(), Long.MAX_VALUE)); + } } - - return Math.min(maxEntriesBasedOnSize, maxEntries); + return Math.max(result, 1); } @Override diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index ef5b3b0d33e..93e824157b0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -217,6 +217,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private final CallbackMutex offloadMutex = new CallbackMutex(); private static final CompletableFuture<PositionImpl> NULL_OFFLOAD_PROMISE = CompletableFuture .completedFuture(PositionImpl.LATEST); + @VisibleForTesting + @Getter protected volatile LedgerHandle currentLedger; protected volatile long currentLedgerEntries = 0; protected volatile long currentLedgerSize = 0; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index c8d14cebebc..02718561705 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -302,7 +302,7 @@ public class RangeEntryCacheImpl implements EntryCache { doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition, numberOfEntries, shouldCacheEntry, originalCallback, ctx); } else { - long estimatedEntrySize = getEstimatedEntrySize(); + long estimatedEntrySize = getEstimatedEntrySize(lh); long estimatedReadSize = numberOfEntries * estimatedEntrySize; if (log.isDebugEnabled()) { log.debug("Estimated read size: {} bytes for {} entries with {} estimated entry size", @@ -418,12 +418,12 @@ public class RangeEntryCacheImpl implements EntryCache { } @VisibleForTesting - public long getEstimatedEntrySize() { - long estimatedEntrySize = getAvgEntrySize(); - if (estimatedEntrySize == 0) { - estimatedEntrySize = DEFAULT_ESTIMATED_ENTRY_SIZE; + public long getEstimatedEntrySize(ReadHandle lh) { + if (lh.getLength() == 0 || lh.getLastAddConfirmed() < 0) { + // No entries stored. + return Math.max(getAvgEntrySize(), DEFAULT_ESTIMATED_ENTRY_SIZE) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; } - return estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; + return Math.max(1, lh.getLength() / (lh.getLastAddConfirmed() + 1)) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; } private long getAvgEntrySize() { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java index 48f0cf08ddf..6676baf8b55 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java @@ -141,10 +141,9 @@ public class InflightReadsLimiterIntegrationTest extends MockedBookKeeperTestCas SimpleReadEntriesCallback cb0 = new SimpleReadEntriesCallback(); entryCache.asyncReadEntry(spyCurrentLedger, 125, 125, true, cb0, ctx); cb0.entries.join(); - Long sizePerEntry1 = entryCache.getEstimatedEntrySize(); - Assert.assertEquals(sizePerEntry1, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); + int sizePerEntry = Long.valueOf(entryCache.getEstimatedEntrySize(ml.currentLedger)).intValue(); Awaitility.await().untilAsserted(() -> { - long remainingBytes =limiter.getRemainingBytes(); + long remainingBytes = limiter.getRemainingBytes(); Assert.assertEquals(remainingBytes, totalCapacity); }); log.info("remainingBytes 0: {}", limiter.getRemainingBytes()); @@ -165,7 +164,7 @@ public class InflightReadsLimiterIntegrationTest extends MockedBookKeeperTestCas entryCache.asyncReadEntry(spyCurrentLedger, start2, end2, true, cb2, ctx); }).start(); - long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1 + readCount2, 1); + long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1 + readCount2, sizePerEntry); long remainingBytesExpected1 = totalCapacity - bytesAcquired1; log.info("acquired : {}", bytesAcquired1); log.info("remainingBytesExpected 0 : {}", remainingBytesExpected1); @@ -178,9 +177,7 @@ public class InflightReadsLimiterIntegrationTest extends MockedBookKeeperTestCas Thread.sleep(3000); readCompleteSignal1.countDown(); cb1.entries.join(); - Long sizePerEntry2 = entryCache.getEstimatedEntrySize(); - Assert.assertEquals(sizePerEntry2, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); - long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2, 1); + long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2, sizePerEntry); long remainingBytesExpected2 = totalCapacity - bytesAcquired2; log.info("acquired : {}", bytesAcquired2); log.info("remainingBytesExpected 1: {}", remainingBytesExpected2); @@ -191,8 +188,6 @@ public class InflightReadsLimiterIntegrationTest extends MockedBookKeeperTestCas readCompleteSignal2.countDown(); cb2.entries.join(); - Long sizePerEntry3 = entryCache.getEstimatedEntrySize(); - Assert.assertEquals(sizePerEntry3, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); Awaitility.await().untilAsserted(() -> { long remainingBytes = limiter.getRemainingBytes(); log.info("remainingBytes 2: {}", remainingBytes); @@ -204,7 +199,7 @@ public class InflightReadsLimiterIntegrationTest extends MockedBookKeeperTestCas } private long calculateBytesSizeBeforeFirstReading(int entriesCount, int perEntrySize) { - return entriesCount * (perEntrySize + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); + return entriesCount * perEntrySize; } class SimpleReadEntriesCallback implements AsyncCallbacks.ReadEntriesCallback { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index fe484d62c4e..62c1539648b 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -18,6 +18,7 @@ */ package org.apache.bookkeeper.mledger.impl; +import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; @@ -681,13 +682,15 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { ManagedCursor cursor = ledger.openCursor("c1"); for (int i = 0; i < 100; i++) { - ledger.addEntry(new byte[1024]); + ledger.addEntry(new byte[(int) (1024)]); } - // First time, since we don't have info, we'll get 1 single entry - readAndCheck(cursor, 10, 3 * 1024, 1); + // Since https://github.com/apache/pulsar/pull/23931 improved the performance of delivery, the consumer + // will get more messages than before(it only receives 1 messages at the first delivery), + int avg = (int) (BOOKKEEPER_READ_OVERHEAD_PER_ENTRY + 1024); + readAndCheck(cursor, 10, 3 * avg, 3); // We should only return 3 entries, based on the max size - readAndCheck(cursor, 20, 3 * 1024, 3); + readAndCheck(cursor, 20, 3 * avg, 3); // If maxSize is < avg, we should get 1 entry readAndCheck(cursor, 10, 500, 1); } @@ -3885,13 +3888,15 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { ledger.addEntry(new byte[1024]); } - // First time, since we don't have info, we'll get 1 single entry - List<Entry> entries = c.readEntriesOrWait(10, 3 * 1024); - assertEquals(entries.size(), 1); + // Since https://github.com/apache/pulsar/pull/23931 improved the performance of delivery, the consumer + // will get more messages than before(it only receives 1 messages at the first delivery), + int avg = (int) (1024 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); + List<Entry> entries = c.readEntriesOrWait(10, 3 * avg); + assertEquals(entries.size(), 3); entries.forEach(Entry::release); // We should only return 3 entries, based on the max size - entries = c.readEntriesOrWait(10, 3 * 1024); + entries = c.readEntriesOrWait(10, 3 * avg); assertEquals(entries.size(), 3); entries.forEach(Entry::release); @@ -4798,5 +4803,82 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { assertEquals(cursor.getReadPosition(), markDeletedPosition.getNext()); } + @Test + public void testEstimateEntryCountBySize() throws Exception { + final String mlName = "ml-" + UUID.randomUUID().toString().replaceAll("-", ""); + ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName); + long entryCount0 = + ManagedCursorImpl.estimateEntryCountBySize(16, PositionImpl.get(ml.getCurrentLedger().getId(), 0), ml); + assertEquals(entryCount0, 1); + // Avoid trimming ledgers. + ml.openCursor("c1"); + + // Build data. + for (int i = 0; i < 100; i++) { + ml.addEntry(new byte[]{1}); + } + long ledger1 = ml.getCurrentLedger().getId(); + ml.getCurrentLedger().close(); + ml.ledgerClosed(ml.getCurrentLedger()); + for (int i = 0; i < 100; i++) { + ml.addEntry(new byte[]{1, 2}); + } + long ledger2 = ml.getCurrentLedger().getId(); + ml.getCurrentLedger().close(); + ml.ledgerClosed(ml.getCurrentLedger()); + for (int i = 0; i < 100; i++) { + ml.addEntry(new byte[]{1, 2, 3, 4}); + } + long ledger3 = ml.getCurrentLedger().getId(); + MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo1 = ml.getLedgersInfo().get(ledger1); + MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo2 = ml.getLedgersInfo().get(ledger2); + long average1 = ledgerInfo1.getSize() / ledgerInfo1.getEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; + long average2 = ledgerInfo2.getSize() / ledgerInfo2.getEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; + long average3 = ml.getCurrentLedgerSize() / ml.getCurrentLedgerEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; + assertEquals(average1, 1 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); + assertEquals(average2, 2 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); + assertEquals(average3, 4 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); + + // Test: the individual ledgers. + long entryCount1 = + ManagedCursorImpl.estimateEntryCountBySize(average1 * 16, PositionImpl.get(ledger1, 0), ml); + assertEquals(entryCount1, 16); + long entryCount2 = + ManagedCursorImpl.estimateEntryCountBySize(average2 * 8, PositionImpl.get(ledger2, 0), ml); + assertEquals(entryCount2, 8); + long entryCount3 = + ManagedCursorImpl.estimateEntryCountBySize(average3 * 4, PositionImpl.get(ledger3, 0), ml); + assertEquals(entryCount3, 4); + + // Test: across ledgers. + long entryCount4 = + ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 8), PositionImpl.get(ledger1, 0), ml); + assertEquals(entryCount4, 108); + long entryCount5 = + ManagedCursorImpl.estimateEntryCountBySize((average2 * 100) + (average3 * 4), PositionImpl.get(ledger2, 0), ml); + assertEquals(entryCount5, 104); + long entryCount6 = + ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 100) + (average3 * 4), PositionImpl.get(ledger1, 0), ml); + assertEquals(entryCount6, 204); + + long entryCount7 = + ManagedCursorImpl.estimateEntryCountBySize((average1 * 20) + (average2 * 8), PositionImpl.get(ledger1, 80), ml); + assertEquals(entryCount7, 28); + long entryCount8 = + ManagedCursorImpl.estimateEntryCountBySize((average2 * 20) + (average3 * 4), PositionImpl.get(ledger2, 80), ml); + assertEquals(entryCount8, 24); + long entryCount9 = + ManagedCursorImpl.estimateEntryCountBySize((average1 * 20) + (average2 * 100) + (average3 * 4), PositionImpl.get(ledger1, 80), ml); + assertEquals(entryCount9, 124); + + // Test: read more than entries written. + long entryCount10 = + ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 100) + (average3 * 100) + (average3 * 4) , PositionImpl.get(ledger1, 0), ml); + assertEquals(entryCount10, 304); + + // cleanup. + ml.delete(); + } + private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java index 52147f74f4a..21a7c179b82 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java @@ -85,7 +85,7 @@ public class BatchMessageWithBatchIndexLevelTest extends BatchMessageTest { .newConsumer() .topic(topicName) .subscriptionName(subscriptionName) - .receiverQueueSize(10) + .receiverQueueSize(50) .subscriptionType(SubscriptionType.Shared) .enableBatchIndexAcknowledgment(true) .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS) @@ -114,27 +114,29 @@ public class BatchMessageWithBatchIndexLevelTest extends BatchMessageTest { consumer.acknowledge(receive1); consumer.acknowledge(receive2); Awaitility.await().untilAsserted(() -> { - assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 18); + // Since https://github.com/apache/pulsar/pull/23931 improved the mechanism of estimate average entry size, + // broker will deliver much messages than before. So edit 18 -> 38 here. + assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 38); }); Message<byte[]> receive3 = consumer.receive(); Message<byte[]> receive4 = consumer.receive(); consumer.acknowledge(receive3); consumer.acknowledge(receive4); Awaitility.await().untilAsserted(() -> { - assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16); + assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 36); }); // Block cmd-flow send until verify finish. see: https://github.com/apache/pulsar/pull/17436. consumer.pause(); Message<byte[]> receive5 = consumer.receive(); consumer.negativeAcknowledge(receive5); Awaitility.await().pollInterval(1, TimeUnit.MILLISECONDS).untilAsserted(() -> { - assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 0); + assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 20); }); // Unblock cmd-flow. consumer.resume(); consumer.receive(); Awaitility.await().untilAsserted(() -> { - assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16); + assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 36); }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java index 5aeed40107d..423c4daa291 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java @@ -388,8 +388,13 @@ public class ConsumerStatsTest extends ProducerConsumerBase { .batchingMaxPublishDelay(5, TimeUnit.SECONDS) .batchingMaxBytes(Integer.MAX_VALUE) .create(); - - producer.send("first-message"); + // The first messages deliver: 20 msgs. + // Average of "messages per batch" is "1". + for (int i = 0; i < 20; i++) { + producer.send("first-message"); + } + // The second messages deliver: 20 msgs. + // Average of "messages per batch" is "Math.round(1 * 0.9 + 20 * 0.1) = 2.9 ~ 3". List<CompletableFuture<MessageId>> futures = new ArrayList<>(); for (int i = 0; i < 20; i++) { futures.add(producer.sendAsync("message")); @@ -423,6 +428,7 @@ public class ConsumerStatsTest extends ProducerConsumerBase { metadataConsumer.put("matchValueReschedule", "producer2"); @Cleanup Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).properties(metadataConsumer) + .receiverQueueSize(20) .subscriptionName(subName).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe(); int counter = 0; @@ -437,14 +443,17 @@ public class ConsumerStatsTest extends ProducerConsumerBase { } } - assertEquals(21, counter); + assertEquals(40, counter); ConsumerStats consumerStats = admin.topics().getStats(topic).getSubscriptions().get(subName).getConsumers().get(0); - assertEquals(21, consumerStats.getMsgOutCounter()); + assertEquals(40, consumerStats.getMsgOutCounter()); - // Math.round(1 * 0.9 + 0.1 * (20 / 1)) + // The first messages deliver: 20 msgs. + // Average of "messages per batch" is "1". + // The second messages deliver: 20 msgs. + // Average of "messages per batch" is "Math.round(1 * 0.9 + 20 * 0.1) = 2.9 ~ 3". int avgMessagesPerEntry = consumerStats.getAvgMessagesPerEntry(); assertEquals(3, avgMessagesPerEntry); }