This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new bae4962d09a [fix] [broker] Fix estimateBacklogFromPosition if position
is greater than the greatest ledgerId (#20069)
bae4962d09a is described below
commit bae4962d09ac259b50a726882cced7a8262a7ee3
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue Apr 11 17:36:37 2023 +0800
[fix] [broker] Fix estimateBacklogFromPosition if position is greater than
the greatest ledgerId (#20069)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 18 ++++++++----------
1 file changed, 8 insertions(+), 10 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 2c5000afca5..9e999ab5939 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1255,18 +1255,16 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
long estimateBacklogFromPosition(PositionImpl pos) {
synchronized (this) {
+ long sizeBeforePosLedger =
ledgers.headMap(pos.getLedgerId()).values()
+ .stream().mapToLong(LedgerInfo::getSize).sum();
LedgerInfo ledgerInfo = ledgers.get(pos.getLedgerId());
+ long sizeAfter = getTotalSize() - sizeBeforePosLedger;
if (ledgerInfo == null) {
- return getTotalSize(); // position no longer in managed
ledger, so return total size
- }
- long sizeBeforePosLedger = ledgers.values().stream().filter(li ->
li.getLedgerId() < pos.getLedgerId())
- .mapToLong(LedgerInfo::getSize).sum();
- long size = getTotalSize() - sizeBeforePosLedger;
-
- if (pos.getLedgerId() == currentLedger.getId()) {
- return size - consumedLedgerSize(currentLedgerSize,
currentLedgerEntries, pos.getEntryId());
+ return sizeAfter;
+ } else if (pos.getLedgerId() == currentLedger.getId()) {
+ return sizeAfter - consumedLedgerSize(currentLedgerSize,
currentLedgerEntries, pos.getEntryId());
} else {
- return size - consumedLedgerSize(ledgerInfo.getSize(),
ledgerInfo.getEntries(), pos.getEntryId());
+ return sizeAfter - consumedLedgerSize(ledgerInfo.getSize(),
ledgerInfo.getEntries(), pos.getEntryId());
}
}
}
@@ -1275,7 +1273,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
if (ledgerEntries <= 0) {
return 0;
}
- if (ledgerEntries == (consumedEntries + 1)) {
+ if (ledgerEntries <= (consumedEntries + 1)) {
return ledgerSize;
} else {
long averageSize = ledgerSize / ledgerEntries;