oneby-wang commented on code in PR #25117:
URL: https://github.com/apache/pulsar/pull/25117#discussion_r2767230739
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2257,37 +2257,51 @@ public void asyncMarkDelete(final Position position,
Map<String, Long> propertie
log.debug("[{}] Mark delete cursor {} up to position: {}",
ledger.getName(), name, position);
}
+ // Snapshot all positions into local variables to avoid race condition.
Position newPosition = ackBatchPosition(position);
+ Position moveForwardPosition = newPosition;
Position markDeletePos = markDeletePosition;
Position lastConfirmedEntry = ledger.getLastConfirmedEntry();
- if (lastConfirmedEntry.compareTo(newPosition) < 0) {
- boolean shouldCursorMoveForward = false;
- try {
- long ledgerEntries =
ledger.getLedgerInfo(markDeletePos.getLedgerId()).get().getEntries();
+ boolean shouldCursorMoveForward = false;
+ try {
+ if (lastConfirmedEntry.getLedgerId() >= newPosition.getLedgerId())
{
+ LedgerInfo curMarkDeleteLedgerInfo =
ledger.getLedgerInfo(newPosition.getLedgerId()).get();
+ Long nextValidLedger =
ledger.getNextValidLedger(newPosition.getLedgerId());
+ shouldCursorMoveForward = (nextValidLedger != null)
+ && (curMarkDeleteLedgerInfo != null
+ && newPosition.getEntryId() + 1 >=
curMarkDeleteLedgerInfo.getEntries());
+ if (shouldCursorMoveForward) {
+ moveForwardPosition =
PositionFactory.create(nextValidLedger, -1);
+ }
Review Comment:
> the `getNextValidLedger` will return the given ledger id if it's already
the last ledger.
the `getNextValidLedger` method calls `ledgers.ceilingKey(ledgerId + 1)`, if
`ledgerId + 1` doesn't exists in this `NavigableMap`, the method will return
null, indicating the current ledger is the last ledger.
https://github.com/apache/pulsar/blob/6d415c6dbfb8bf9f2f8a5b823927cf422a6bd675/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L4195-L4197
> btw. For a LedgerInfo instance, it's currently possible to detect the
rollover by checking the timestamp field. It's != 0 for rolled over ledgers.
I think `nextValidLedger != null` is ok to detect the rollover, the
`timestamp` field may be modified by mistake.
> Another detail of batch acknowledgements can be seen in ackBatchPosition
method. If the last entry isn't fully acknowledged, the acked position would be
the previous position. I'm not sure if this impacts this logic.
Similar logic in the
`ManagedLedgerImpl#maybeUpdateCursorBeforeTrimmingConsumedLedger()` method, I'm
wondering if this method also affects the `ackBatchPosition` method.
https://github.com/apache/pulsar/blob/6d415c6dbfb8bf9f2f8a5b823927cf422a6bd675/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L2722-L2740
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]