This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 725b82c49ce76c2cfb21c87b5dd60624e799ba73 Author: fengyubiao <[email protected]> AuthorDate: Fri Jan 20 07:47:29 2023 +0800 [fix] [ml] Topics stats shows msgBacklog but there reality no backlog (#19275) ### Motivation #### 1. `readPosition` point to a deleted ledger When `trim ledgers` and `create new cursor` are executed concurrently, it will cause the `readPosition` of the cursor to point to a deleted ledger. | time | `trim ledgers` | `create new cursor` | | --- | --- | --- | | 1 | | set read position and mark deleted position | | 2 | delete ledger | | | 3 | | add the cursor to `ManagedLedger.cursors` | ---- #### 2. Backlog wrong caused by `readPosition` wrong <strong>(Highlight)</strong>Since the read position of the cursor is pointing at a deleted ledger, so deleted messages will never be consumed or acknowledged. Since the backlog in the API `topics stats` response is calculated as this: `managedLedger.entriesAddedCounter - cursor.messagesConsumedCounter`, the result is: Topics stats show `msgBacklog` but there is reality no backlog. - `managedLedger.entriesAddedCounter`: Pulsar will set it to `0` when creating a new managed ledger, it will increment when adding entries. - `cursor.messagesConsumedCounter`: Pulsar will set it to `0` when creating a new cursor, it will increment when acknowledging. For example: - write entries to the managed ledger: `{1:0~1:9}...{5:0~5:9}` - `managedLedger.entriesAddedCounter` is `50` now - create a new cursor, and set the read position to `1:0` - `cursor.messagesConsumedCounter` is `0` now - delete ledgers `1~4` - consume all messages - can only consume the messages {5:0~5:9}, so `cursor.messagesConsumedCounter` is `10` now - the `backlog` in response of `topics stats` is `50 - 10 = 40`, but there reality no backlog ---- #### 3. Reproduce issue Sorry, I spent 4 hours trying to write a non-invasive test, but failed. <strong>(Highlight)</strong>You can reproduce by `testBacklogIfCursorCreateConcurrentWithTrimLedger` in the PR #19274 https://github.com/apache/pulsar/blob/a2cdc759fc2710e4dd913eb0485d23ebcaa076a4/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/StatsBackLogTest.java#L163 ### Modifications Avoid the race condition of `cursor.initializeCursorPosition` and `internalTrimLedgers` (cherry picked from commit 4139fef29b040c90e8c1def0cec69410f43bab76) --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index a8736ea666c..049016b15ea 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -963,12 +963,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { public void operationComplete() { log.info("[{}] Opened new cursor: {}", name, cursor); cursor.setActive(); - // Update the ack position (ignoring entries that were written while the cursor was being created) - cursor.initializeCursorPosition(InitialPosition.Earliest == initialPosition - ? getFirstPositionAndCounter() - : getLastPositionAndCounter()); - synchronized (ManagedLedgerImpl.this) { + // Update the ack position (ignoring entries that were written while the cursor was being created) + cursor.initializeCursorPosition(InitialPosition.Earliest == initialPosition + ? getFirstPositionAndCounter() + : getLastPositionAndCounter()); addCursor(cursor); uninitializedCursors.remove(cursorName).complete(cursor); }
