BewareMyPower opened a new pull request, #25998:
URL: https://github.com/apache/pulsar/pull/25998

   ### Motivation
   
   We observed the system topic reader encountered a typical bug case: 
`hasMessageAvailable()` returns true but `readNext()` is not able to read any 
message. It causes all topics in the same namespace fail to load.
   
   After digging into details, I've figured out the cause for this incident. 
During the incident, there are two unload operations on bundles of the same 
namespace in short time:
   1. The 1st bundle unloaded a user topic, then the topic loading triggered 
the read on `__change_events` topic
   2. The system topic reader started to read from earliest, then it read a 
message from `broker-0`, the message id is `A:2` 
   3. The 2nd bundle unloaded the `__change_events` topic from `broker-0` to 
`broker-1`
   4. The system topic reader then reconnected to `broker-1`, with `A:2` as the 
start message id.
   
   Then the reader was stuck. From the heap dump, I found the following 
important info from the dispatcher:
   
   ```yaml
   isFirstRead: true
   activeConsumer:
     msgOutCounter: 0
     messagePermits: 1000
     startMessageId: "A:2"
     msgOutCounter: 0
     messageAckCounter: 1
   cursor:
     ledger:
       ledgers:
       - {"ledgerId":B,"entries":0}
       lastConfirmedEntry: "B:-1"
     entriesReadCount: 0
     readPosition: "B:0"
     waitingReadOp:
       entries: []
       readPosition: "B:0"
       nextReadPosition: "B:0"
   ```
   
   As you can see, all entries are compacted (the original ledger id is `A`), 
the managed ledger was just empty with an empty ledger whose id is `B`.
   
   From the logs, we can see the cursor's initial mark delete position and read 
position is different:
   
   ```
   ackPos=A:1, readPos=A:2
   ```
   
   But the `ackPos` is `A:2` in the heap dump, and `readPos` is `B:0`. I also 
checked the topic compaction context's cache and dumped the compacted ledger to 
verify there is even no `findStartPoint` call on the compacted ledger.
   
   The root cause is:
   1. When the reader received a message from `broker-0`, it stored the message 
ID `A:2` to `PersistentAcknowledgmentsGroupingTracker#lastCumulativeAck` via 
`acknowledgeCumulativeAsync`.
   2. After 100ms (the default `acknowledgmentGroupTime`), the reader had 
already switched the connection to `broker-1`, then it flushed the ACK request 
to `broker-1`
   3. `broker-1` received the ACK request and simply called 
`cursor.asyncMarkDelete(A:2)`.
   
   
https://github.com/apache/pulsar/blob/a045e4c279505fa842da0135c839c78c4cf6c3c4/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L474
   
   The mark delete position was simply set to `A:2`:
   
   
https://github.com/apache/pulsar/blob/a045e4c279505fa842da0135c839c78c4cf6c3c4/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2216
   
   However, the read position was advanced to **the next valid position** 
(`B:0`) in the managed ledger:
   
   
https://github.com/apache/pulsar/blob/a045e4c279505fa842da0135c839c78c4cf6c3c4/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2225
   
   while the compaction horizon is just `B:-1`, which is behind the new read 
position:
   
   
https://github.com/apache/pulsar/blob/a045e4c279505fa842da0135c839c78c4cf6c3c4/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java#L55-L56
   
   and the read operation went through the managed cursor directly and would be 
stuck forever.
   
   ### Modifications
   
   When a subscription has a compacted consumer, if the acknowledged position's 
ledger id does not exist in the managed ledger, just do nothing. Otherwise, 
even if the next valid ledger has messages to read, the compacted entries could 
be skipped, which leads to message loss.
   
   ### Verifying this change
   
   `testReceiveAckAfterReconnectionOnEmptyLedger` covers the case exactly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to