This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4139fef29b0 [fix] [ml] Topics stats shows msgBacklog but there reality
no backlog (#19275)
4139fef29b0 is described below
commit 4139fef29b040c90e8c1def0cec69410f43bab76
Author: fengyubiao <[email protected]>
AuthorDate: Fri Jan 20 07:47:29 2023 +0800
[fix] [ml] Topics stats shows msgBacklog but there reality no backlog
(#19275)
### Motivation
#### 1. `readPosition` point to a deleted ledger
When `trim ledgers` and `create new cursor` are executed concurrently, it
will cause the `readPosition` of the cursor to point to a deleted ledger.
| time | `trim ledgers` | `create new cursor` |
| --- | --- | --- |
| 1 | | set read position and mark deleted position |
| 2 | delete ledger | |
| 3 | | add the cursor to `ManagedLedger.cursors` |
----
#### 2. Backlog wrong caused by `readPosition` wrong
<strong>(Highlight)</strong>Since the read position of the cursor is
pointing at a deleted ledger, so deleted messages will never be consumed or
acknowledged. Since the backlog in the API `topics stats` response is
calculated as this: `managedLedger.entriesAddedCounter -
cursor.messagesConsumedCounter`, the result is: Topics stats show `msgBacklog`
but there is reality no backlog.
- `managedLedger.entriesAddedCounter`: Pulsar will set it to `0` when
creating a new managed ledger, it will increment when adding entries.
- `cursor.messagesConsumedCounter`: Pulsar will set it to `0` when creating
a new cursor, it will increment when acknowledging.
For example:
- write entries to the managed ledger: `{1:0~1:9}...{5:0~5:9}`
- `managedLedger.entriesAddedCounter` is `50` now
- create a new cursor, and set the read position to `1:0`
- `cursor.messagesConsumedCounter` is `0` now
- delete ledgers `1~4`
- consume all messages
- can only consume the messages {5:0~5:9}, so
`cursor.messagesConsumedCounter` is `10` now
- the `backlog` in response of `topics stats` is `50 - 10 = 40`, but there
reality no backlog
----
#### 3. Reproduce issue
Sorry, I spent 4 hours trying to write a non-invasive test, but failed.
<strong>(Highlight)</strong>You can reproduce by
`testBacklogIfCursorCreateConcurrentWithTrimLedger` in the PR #19274
https://github.com/apache/pulsar/blob/a2cdc759fc2710e4dd913eb0485d23ebcaa076a4/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/StatsBackLogTest.java#L163
### Modifications
Avoid the race condition of `cursor.initializeCursorPosition` and
`internalTrimLedgers`
---
.../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 9 ++++-----
1 file changed, 4 insertions(+), 5 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index cf109297389..a9bfdd6226f 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -993,12 +993,11 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
public void operationComplete() {
log.info("[{}] Opened new cursor: {}", name, cursor);
cursor.setActive();
- // Update the ack position (ignoring entries that were written
while the cursor was being created)
- cursor.initializeCursorPosition(InitialPosition.Earliest ==
initialPosition
- ? getFirstPositionAndCounter()
- : getLastPositionAndCounter());
-
synchronized (ManagedLedgerImpl.this) {
+ // Update the ack position (ignoring entries that were
written while the cursor was being created)
+ cursor.initializeCursorPosition(InitialPosition.Earliest
== initialPosition
+ ? getFirstPositionAndCounter()
+ : getLastPositionAndCounter());
addCursor(cursor);
uninitializedCursors.remove(cursorName).complete(cursor);
}