This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new bae4962d09a [fix] [broker] Fix estimateBacklogFromPosition if position 
is greater than the greatest ledgerId (#20069)
bae4962d09a is described below

commit bae4962d09ac259b50a726882cced7a8262a7ee3
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue Apr 11 17:36:37 2023 +0800

    [fix] [broker] Fix estimateBacklogFromPosition if position is greater than 
the greatest ledgerId (#20069)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java     | 18 ++++++++----------
 1 file changed, 8 insertions(+), 10 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 2c5000afca5..9e999ab5939 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1255,18 +1255,16 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
 
     long estimateBacklogFromPosition(PositionImpl pos) {
         synchronized (this) {
+            long sizeBeforePosLedger = 
ledgers.headMap(pos.getLedgerId()).values()
+                    .stream().mapToLong(LedgerInfo::getSize).sum();
             LedgerInfo ledgerInfo = ledgers.get(pos.getLedgerId());
+            long sizeAfter = getTotalSize() - sizeBeforePosLedger;
             if (ledgerInfo == null) {
-                return getTotalSize(); // position no longer in managed 
ledger, so return total size
-            }
-            long sizeBeforePosLedger = ledgers.values().stream().filter(li -> 
li.getLedgerId() < pos.getLedgerId())
-                    .mapToLong(LedgerInfo::getSize).sum();
-            long size = getTotalSize() - sizeBeforePosLedger;
-
-            if (pos.getLedgerId() == currentLedger.getId()) {
-                return size - consumedLedgerSize(currentLedgerSize, 
currentLedgerEntries, pos.getEntryId());
+                return sizeAfter;
+            } else if (pos.getLedgerId() == currentLedger.getId()) {
+                return sizeAfter - consumedLedgerSize(currentLedgerSize, 
currentLedgerEntries, pos.getEntryId());
             } else {
-                return size - consumedLedgerSize(ledgerInfo.getSize(), 
ledgerInfo.getEntries(), pos.getEntryId());
+                return sizeAfter - consumedLedgerSize(ledgerInfo.getSize(), 
ledgerInfo.getEntries(), pos.getEntryId());
             }
         }
     }
@@ -1275,7 +1273,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         if (ledgerEntries <= 0) {
             return 0;
         }
-        if (ledgerEntries == (consumedEntries + 1)) {
+        if (ledgerEntries <= (consumedEntries + 1)) {
             return ledgerSize;
         } else {
             long averageSize = ledgerSize / ledgerEntries;

Reply via email to