ShuKe-code opened a new issue, #25813:
URL: https://github.com/apache/pulsar/issues/25813

   ### Search before reporting
   
   - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Read release policy
   
   - [x] I understand that [unsupported 
versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions)
 don't get bug fixes. I will attempt to reproduce the issue on a supported 
version of Pulsar client and Pulsar broker.
   
   
   ### User environment
   
   pulsar broker 3.0.12
   java 17
   
   ### Issue Description
   
   Observed on a 3.0.x based branch.
   
   ### What did you expect to see?
   
   After the inactive subscription is deleted and all old ledgers are trimmed, 
recreating the durable subscription with `Earliest` should initialize the 
cursor with backlog `0` if there is no real remaining data.
   
   `stats.backlog`, `stats-internal`, and precise backlog should be consistent.
   
   ### What did you see instead?
   
   `admin topics stats` can report `backlog > 0`, while:
   
   - `admin topics stats-internal` shows old ledgers already trimmed
   - precise backlog is `0`
   - the consumer cannot actually receive any message
   - unloading the topic makes the reported backlog become `0`
   
   ### Error messages
   
   ```text
   
   ```
   
   ### Reproducing the issue
   
   1. Create a partitioned topic with durable subscription.
   2. Produce messages across multiple ledgers.
   3. Configure retention aggressively so old ledgers are trimmed, leaving only 
a new empty current ledger.
   5. Create  a new subscription name and 
`SubscriptionInitialPosition.Earliest`.
   6. Compare:
      - `admin topics stats <topic>`
      - `admin topics stats-internal <topic>`
      - precise backlog
   
   ### Additional information
   
   Root cause is in `ManagedLedgerImpl#getFirstPositionAndCounter()` during 
earliest cursor initialization.
   
   When all historical ledgers are already trimmed and only an empty current 
ledger remains, `getFirstPosition()` can return a synthetic position on the old 
last confirmed ledger (for example `469:-1`).  
   `getFirstPositionAndCounter()` then uses that synthetic position in 
`getNumberOfEntries(Range.openClosed(...))`, which counts entries from a ledger 
that no longer exists in `ledgers`.
   
   This leads to an incorrect `messagesConsumedCounter` when a durable 
subscription is recreated with `SubscriptionInitialPosition.Earliest` after 
inactive subscription deletion. The inaccurate counter then propagates into 
approximate backlog stats.
   
   A fix is to treat this synthetic-first-position case specially and 
initialize the earliest cursor from `lastConfirmedEntry` and 
`entriesAddedCounter` instead of counting against trimmed ledgers.
   
   --- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
   +++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
   @@ -3798,19 +3798,40 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
         */
        Pair<PositionImpl, Long> getFirstPositionAndCounter() {
            PositionImpl pos;
   +        PositionImpl firstPosition;
            long count;
            Pair<PositionImpl, Long> lastPositionAndCounter;
    
            do {
   -            pos = getFirstPosition();
   +            firstPosition = getFirstPosition();
                lastPositionAndCounter = getLastPositionAndCounter();
   -            count = lastPositionAndCounter.getRight()
   -                    - getNumberOfEntries(Range.openClosed(pos, 
lastPositionAndCounter.getLeft()));
   -        } while (pos.compareTo(getFirstPosition()) != 0
   +            if (isSyntheticFirstPosition(firstPosition)) {
   +                pos = lastPositionAndCounter.getLeft();
   +                count = lastPositionAndCounter.getRight();
   +            } else {
   +                pos = firstPosition;
   +                count = lastPositionAndCounter.getRight()
   +                        - getNumberOfEntries(Range.openClosed(pos, 
lastPositionAndCounter.getLeft()));
   +            }
   +        } while (firstPosition.compareTo(getFirstPosition()) != 0
                    || 
lastPositionAndCounter.getLeft().compareTo(getLastPosition()) != 0);
            return Pair.of(pos, count);
        }
    
   +    private boolean isSyntheticFirstPosition(PositionImpl position) {
   +        Long firstLedgerId = ledgers.firstKey();
   +        if (firstLedgerId == null || position == null) {
   +            return false;
   +        }
   +
   +        LedgerInfo firstLedger = ledgers.get(firstLedgerId);
   +        return firstLedgerId > lastConfirmedEntry.getLedgerId()
   +                && position.getLedgerId() == 
lastConfirmedEntry.getLedgerId()
   +                && position.getEntryId() == -1
   +                && firstLedger != null
   +                && firstLedger.getEntries() == 0;
   +    }
   +
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to