BewareMyPower opened a new pull request, #25998:
URL: https://github.com/apache/pulsar/pull/25998
### Motivation
We observed the system topic reader encountered a typical bug case:
`hasMessageAvailable()` returns true but `readNext()` is not able to read any
message. It causes all topics in the same namespace fail to load.
After digging into details, I've figured out the cause for this incident.
During the incident, there are two unload operations on bundles of the same
namespace in short time:
1. The 1st bundle unloaded a user topic, then the topic loading triggered
the read on `__change_events` topic
2. The system topic reader started to read from earliest, then it read a
message from `broker-0`, the message id is `A:2`
3. The 2nd bundle unloaded the `__change_events` topic from `broker-0` to
`broker-1`
4. The system topic reader then reconnected to `broker-1`, with `A:2` as the
start message id.
Then the reader was stuck. From the heap dump, I found the following
important info from the dispatcher:
```yaml
isFirstRead: true
activeConsumer:
msgOutCounter: 0
messagePermits: 1000
startMessageId: "A:2"
msgOutCounter: 0
messageAckCounter: 1
cursor:
ledger:
ledgers:
- {"ledgerId":B,"entries":0}
lastConfirmedEntry: "B:-1"
entriesReadCount: 0
readPosition: "B:0"
waitingReadOp:
entries: []
readPosition: "B:0"
nextReadPosition: "B:0"
```
As you can see, all entries are compacted (the original ledger id is `A`),
the managed ledger was just empty with an empty ledger whose id is `B`.
From the logs, we can see the cursor's initial mark delete position and read
position is different:
```
ackPos=A:1, readPos=A:2
```
But the `ackPos` is `A:2` in the heap dump, and `readPos` is `B:0`. I also
checked the topic compaction context's cache and dumped the compacted ledger to
verify there is even no `findStartPoint` call on the compacted ledger.
The root cause is:
1. When the reader received a message from `broker-0`, it stored the message
ID `A:2` to `PersistentAcknowledgmentsGroupingTracker#lastCumulativeAck` via
`acknowledgeCumulativeAsync`.
2. After 100ms (the default `acknowledgmentGroupTime`), the reader had
already switched the connection to `broker-1`, then it flushed the ACK request
to `broker-1`
3. `broker-1` received the ACK request and simply called
`cursor.asyncMarkDelete(A:2)`.
https://github.com/apache/pulsar/blob/a045e4c279505fa842da0135c839c78c4cf6c3c4/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L474
The mark delete position was simply set to `A:2`:
https://github.com/apache/pulsar/blob/a045e4c279505fa842da0135c839c78c4cf6c3c4/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2216
However, the read position was advanced to **the next valid position**
(`B:0`) in the managed ledger:
https://github.com/apache/pulsar/blob/a045e4c279505fa842da0135c839c78c4cf6c3c4/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2225
while the compaction horizon is just `B:-1`, which is behind the new read
position:
https://github.com/apache/pulsar/blob/a045e4c279505fa842da0135c839c78c4cf6c3c4/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java#L55-L56
and the read operation went through the managed cursor directly and would be
stuck forever.
### Modifications
When a subscription has a compacted consumer, if the acknowledged position's
ledger id does not exist in the managed ledger, just do nothing. Otherwise,
even if the next valid ledger has messages to read, the compacted entries could
be skipped, which leads to message loss.
### Verifying this change
`testReceiveAckAfterReconnectionOnEmptyLedger` covers the case exactly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]