This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 1691e5b7228 Fix issues in advanceNonDurableCursors (#10667)
1691e5b7228 is described below
commit 1691e5b7228f20957567c6ed8ee114ecddbdbd91
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Tue May 25 09:28:24 2021 -0700
Fix issues in advanceNonDurableCursors (#10667)
Co-authored-by: Jerry Peng <[email protected]>
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 19 ++++++++++++++-----
1 file changed, 14 insertions(+), 5 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index db500e34b4a..7bdc5c9ef11 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2201,7 +2201,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
return;
}
- advanceNonDurableCursors(ledgersToDelete);
+ advanceCursorsIfNecessary(ledgersToDelete);
PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
// Update metadata
@@ -2270,20 +2270,29 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
/**
* Non-durable cursors have to be moved forward when data is trimmed since
they are not retain that data.
+ * This method also addresses a corner case for durable cursors in which
the cursor is caught up, i.e. the mark delete position
+ * happens to be the last entry in a ledger. If the ledger is deleted,
then subsequent calculations for backlog
+ * size may not be accurate since the method getNumberOfEntries we use in
backlog calculation will not be able to fetch
+ * the ledger info of a deleted ledger. Thus, we need to update the mark
delete position to the "-1" entry of the first ledger
+ * that is not marked for deletion.
* This is to make sure that the `consumedEntries` counter is correctly
updated with the number of skipped
* entries and the stats are reported correctly.
*/
- private void advanceNonDurableCursors(List<LedgerInfo> ledgersToDelete) {
+ private void advanceCursorsIfNecessary(List<LedgerInfo> ledgersToDelete) {
if (ledgersToDelete.isEmpty()) {
return;
}
- long firstNonDeletedLedger = ledgers
- .higherKey(ledgersToDelete.get(ledgersToDelete.size() -
1).getLedgerId());
+ // need to move mark delete for non-durable cursors to the first
ledger NOT marked for deletion
+ // calling getNumberOfEntries latter for a ledger that is already
deleted will be problematic and return incorrect results
+ long firstNonDeletedLedger =
ledgers.higherKey(ledgersToDelete.get(ledgersToDelete.size() -
1).getLedgerId());
PositionImpl highestPositionToDelete = new
PositionImpl(firstNonDeletedLedger, -1);
cursors.forEach(cursor -> {
- if (highestPositionToDelete.compareTo((PositionImpl)
cursor.getMarkDeletedPosition()) > 0) {
+ // move the mark delete position to the highestPositionToDelete
only if it is smaller than the add confirmed
+ // to prevent the edge case where the cursor is caught up to the
latest and highestPositionToDelete may be larger than the last add confirmed
+ if (highestPositionToDelete.compareTo((PositionImpl)
cursor.getMarkDeletedPosition()) > 0
+ && highestPositionToDelete.compareTo((PositionImpl)
cursor.getManagedLedger().getLastConfirmedEntry()) <= 0 ) {
cursor.asyncMarkDelete(highestPositionToDelete, new
MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {