This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4139fef29b0 [fix] [ml] Topics stats shows msgBacklog but there reality 
no backlog (#19275)
4139fef29b0 is described below

commit 4139fef29b040c90e8c1def0cec69410f43bab76
Author: fengyubiao <[email protected]>
AuthorDate: Fri Jan 20 07:47:29 2023 +0800

    [fix] [ml] Topics stats shows msgBacklog but there reality no backlog 
(#19275)
    
    ### Motivation
    
    #### 1. `readPosition` point to a deleted ledger
    
    When `trim ledgers` and `create new cursor` are executed concurrently, it 
will cause the `readPosition` of the cursor to point to a deleted ledger.
    
    | time | `trim ledgers` | `create new cursor` |
    | --- | --- | --- |
    | 1 | | set read position and mark deleted position |
    | 2 | delete ledger | |
    | 3 | | add the cursor to `ManagedLedger.cursors` |
    
    ----
    
    #### 2. Backlog wrong caused by `readPosition` wrong
    <strong>(Highlight)</strong>Since the read position of the cursor is 
pointing at a deleted ledger, so deleted messages will never be consumed or 
acknowledged. Since the backlog in the API `topics stats` response is 
calculated as this: `managedLedger.entriesAddedCounter - 
cursor.messagesConsumedCounter`, the result is: Topics stats show `msgBacklog` 
but there is reality no backlog.
    - `managedLedger.entriesAddedCounter`: Pulsar will set it to `0` when 
creating a new managed ledger, it will increment when adding entries.
    - `cursor.messagesConsumedCounter`: Pulsar will set it to `0` when creating 
a new cursor, it will increment when acknowledging.
    
    For example:
    - write entries to the managed ledger: `{1:0~1:9}...{5:0~5:9}`
      - `managedLedger.entriesAddedCounter` is `50` now
    - create a new cursor, and set the read position to `1:0`
      -  `cursor.messagesConsumedCounter` is `0` now
    - delete ledgers `1~4`
    - consume all messages
      - can only consume the messages {5:0~5:9}, so 
`cursor.messagesConsumedCounter` is `10` now
    - the `backlog` in response of `topics stats` is `50 - 10 = 40`, but there 
reality no backlog
    
    ----
    
    #### 3. Reproduce issue
    Sorry, I spent 4 hours trying to write a non-invasive test, but failed. 
<strong>(Highlight)</strong>You can reproduce by 
`testBacklogIfCursorCreateConcurrentWithTrimLedger` in the PR #19274
    
    
https://github.com/apache/pulsar/blob/a2cdc759fc2710e4dd913eb0485d23ebcaa076a4/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/StatsBackLogTest.java#L163
    
    ### Modifications
    
    Avoid the race condition of `cursor.initializeCursorPosition` and 
`internalTrimLedgers`
---
 .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java    | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index cf109297389..a9bfdd6226f 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -993,12 +993,11 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             public void operationComplete() {
                 log.info("[{}] Opened new cursor: {}", name, cursor);
                 cursor.setActive();
-                // Update the ack position (ignoring entries that were written 
while the cursor was being created)
-                cursor.initializeCursorPosition(InitialPosition.Earliest == 
initialPosition
-                        ? getFirstPositionAndCounter()
-                        : getLastPositionAndCounter());
-
                 synchronized (ManagedLedgerImpl.this) {
+                    // Update the ack position (ignoring entries that were 
written while the cursor was being created)
+                    cursor.initializeCursorPosition(InitialPosition.Earliest 
== initialPosition
+                            ? getFirstPositionAndCounter()
+                            : getLastPositionAndCounter());
                     addCursor(cursor);
                     uninitializedCursors.remove(cursorName).complete(cursor);
                 }

Reply via email to