oneby-wang commented on code in PR #25101:
URL: https://github.com/apache/pulsar/pull/25101#discussion_r2639087166
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2713,26 +2713,29 @@ public void
maybeUpdateCursorBeforeTrimmingConsumedLedger() {
for (ManagedCursor cursor : cursors) {
Position lastAckedPosition =
cursor.getPersistentMarkDeletedPosition() != null
? cursor.getPersistentMarkDeletedPosition() :
cursor.getMarkDeletedPosition();
- LedgerInfo currPointedLedger =
ledgers.get(lastAckedPosition.getLedgerId());
+ LedgerInfo curPointedLedger =
ledgers.get(lastAckedPosition.getLedgerId());
LedgerInfo nextPointedLedger =
Optional.ofNullable(ledgers.higherEntry(lastAckedPosition.getLedgerId()))
.map(Map.Entry::getValue).orElse(null);
- if (currPointedLedger != null) {
+ if (curPointedLedger != null) {
if (nextPointedLedger != null) {
if (lastAckedPosition.getEntryId() != -1
- && lastAckedPosition.getEntryId() + 1 >=
currPointedLedger.getEntries()) {
+ && lastAckedPosition.getEntryId() + 1 >=
curPointedLedger.getEntries()) {
lastAckedPosition =
PositionFactory.create(nextPointedLedger.getLedgerId(), -1);
}
} else {
log.debug("No need to reset cursor: {}, current ledger is
the last ledger.", cursor);
}
} else {
+ // TODO no ledger exists, should we move cursor mark deleted
position to nextPointedLedger:-1 ?
log.warn("Cursor: {} does not exist in the managed-ledger.",
cursor);
}
- if (!lastAckedPosition.equals(cursor.getMarkDeletedPosition())) {
+ if (lastAckedPosition.compareTo(cursor.getMarkDeletedPosition()) >
0) {
Review Comment:
> I'm just wondering when would this condition be fulfilled.
Race condition: if ledger rolls over with `cursor,asyncMarkDelete()`
executing, and another thead(threadA) may execute `cursor,asyncMarkDelete()` at
the same time.
The two `cursor,asyncMarkDelete()` operations are executed without order
guarantee. Broken case:
1. rollover thread gets the old `markDeletePosition` 3:-1, then calculates
`lastAckedPosition` as 3:-1.
2. threadA executes `cursor,asyncMarkDelete()` success, and updates
`markDeletePosition` to 13:-1.
3. came back to rollover thread, we will see `lastAckedPosition <
markDeletePosition`.
Maybe we should snapshot `cursor.getMarkDeletePosition()` into a local
variable, instead of calling it every time to avoid this problem.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]