poorbarcode opened a new issue, #20277:
URL: https://github.com/apache/pulsar/issues/20277

   # Background knowledge & Motivation
   
   The Cursor is used to read data from ledgers, there are three scenarios for 
reading messages(we can call these scenarios Read-Scenario):
   - **Sequentially Read**: reading some entries backward from the `read 
position` and updating the read position to the next position to be read when 
reading is finished. 
   - **Message Replay Read**: If some entries are asked for delivery again(such 
as a consumer is closed, but the messages it holds are not acknowledged), 
Cursor caches these positions in memory. It reads them by the specified 
positions which are cached in memory when other alive consumers need more 
entries. 
   - **Specified Read**: The user uses an API like [`pulsar-admin topics 
examine-messages`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#examine-messages)
 to read the specified entry.
   
   There are these scenarios for updating the `read position` of the cursor(we 
can call these scenarios Position-Update-Scenario):
   - **Sequentially Position Update**: update when Sequentially Read is 
finished. 
   - **Manual Position Update**: to reset the start position of the sequential 
read.
     - 
[`consumer.seek`](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java#L482)
 of pulsar client: set `read position` to a specified value.
     - [`pulsar-admin topics 
reset-cursor`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#reset-cursor):
  set `read position` to a specified value.
     - `cursor.rewind`(it is an internal API)<sup>[1]</sup>: set the `read 
position` to the next of `mark deleted position.`
   -  **Additional Position Update**: There is a compensation 
mechanism<sup>[2]</sup> in the scenario Message Replay Read that modifies the 
`read position` of the cursor to avoid repeated read of `read position`.
   
   <strong>[1]</strong>: `cursor.rewind` is an internal API. It is used in 
these three scenarios:
   - When there have no consumers on the dispatcher of subscription, the 
dispatcher will give up all the states in the memory(such as messages which 
will be redelivery; the messages which are delay messages...) and reset the 
`read position` to `{mark deleted position} + 1`, just like this dispatcher is 
a new one. We can call this mechanism Dispatcher Reset.
   - The compression task reads all the data twice, first to find the key that 
needs to be retained and second to dump the data to a new ledger. After the 
first read is complete, it resets the read point by `rewind.`
   - The replication task reads messages of original cluster and sends them to 
the remote cluster. After the data is successfully sent, the replication task 
confirms these messages. If something is wrong, it relies on `rewind` to reset 
the read position to the `mark deleted position + 1` and restart the task.
   
   <strong>[2]</strong>: Something wrong makes there a position the same as 
`read position` in the replay queue, 
https://github.com/apache/pulsar/issues/6403 provides an additional mechanism 
to increase the `read position` before reading to avoid repeated read(read by 
Message Replay Read once and read by Sequentially Read once). 
   
   # Motivation
   For the scenario **Sequentially Read**: Ideally, the `read position` of the 
cursor would be sequentially modified, guaranteeing the order of messages 
reading. But in fact, the `read position` will be modified concurrently, which 
makes message reading out of order. 
   
   If these scenarios ran concurrently, it might make the `read position` go to 
an incorrect value. For example: 
   | time | `thread-message-handle` | `thread-Sequentially Read` |
   | --- | --- | --- |
   | 1 | | read `5~10` complete(`mark delete position` is `4` and `read 
position is `11` now) |
   | 2 | | do callback |
   | 3 | try handle messages `5-10` | start a new read `11~20` |
   | 4 | Something wrong makes this processor can not process the message `5` |
   | 5 | Call `cursor.rewind` to process `5~10` again(`mark delete position` is 
`4` and `read position is `5` now) |
   | 6 | | The second read `11~20` is complete. Set `read position` to `21` |
   | 7 | | do callback |
   | 8 | Try to handle messages `11-20`. <strong>(Highlight)</strong>At this 
moment, the messages `5~10` is missed to process |
   
   For the issue above, there has a mechanism<sup>[3]</sup> to try to fix it, 
but it's not perfect enough to solve the problem.
   
   <strong>[3]</strong>: In the multi-consumer dispatcher, there has a marker, 
if Dispatcher Reset is triggered, this marker will be set to `true`. After 
completing the in-flight reading task, the dispatcher will discard these 
messages and trigger a new `rewind`. We can call the mechanism 
RewindIfReadingOutdated. But it can not cover everything, such as:
   - If there is more than one in-flight reading(one Sequentially Read and some 
Message Replay Read might exist simultaneously), this mechanism can not work. 
   - There is no such mechanism in a single consumer dispatcher.
   - It only focuses on one scenario of **Manual Position Update**, it doesn't 
work for other scenes.
   
   
   
   # Goals
   
   - `read position` of the cursor is a field designed for **Sequentially 
Read**, which should only be changed in the scenario **Sequentially Read**. 
   - If the `read position` is changed by **Manual Position Update**, the 
in-flight reading should be discarded and not change the `read position` 
anymore.
   - Disallow changing `read position` in the scenario **Specified Read**.
   
   
   The `read position` is 
   
   - Add a marker that indicates how many times the `read position` of the 
cursor has been changed by the **Manual Position Update**. We can call the 
marker epoch of the cursor.
   - **Sequentially Read** carries the epoch. If the marker has been updated 
when the reading is complete, give up these entries reader, and do not update 
the `read position` anymore.
   
   ## In Scope
   
   Fixed the issue where sequential reads lost order due to concurrent 
modification of `read position`.
   
   ## Out of Scope
   
   nothing.
   
   
   # Design
   
   To improve the mechanism RewindIfReadingOutdated, we can make the marker a 
counter of in-flight reading instead of a boolean value, discard all the 
in-flight reading, and call `rewind` after the last in-flight reading is 
complete.
   
   Optimized again, we can use a Pointer to mark who is reading complete after 
**Manual Position Update**. If a read complete event is called after this 
Pointer, discard the entries and the action updating `read position`(instead of 
calling `rewind` of other additional `read position` resetting).
   
   ### summarize
   
   - Add a marker that indicates how many times the `read position` of the 
cursor has been changed by the **Manual Position Update**. We can call the 
marker epoch of the cursor.
   - **Sequentially Read** carries the epoch. If the marker has been updated 
when the reading is complete, give up these entries reader, and do not update 
the `read position` anymore.
   - Removes code which changed the `read position` existing in the scenario 
**Specified Read**
   
   
   ### Metrics
   
   <!--
   For each metric provide:
   * Full name
   * Description
   * Attributes (labels)
   * Unit
   -->
   
   
   # Monitoring
   
   <!-- 
   Describe how the changes you make in this proposal should be monitored. 
   Don't describe the detailed metrics - they should be at "Public-facing 
Changes" / "Metrics" section
   Describe how the user will use the metrics to monitor the feature: Which 
alerts they should set up, which thresholds, ...
   -->
   
   # Backward & Forward Compatability
   
   Nothing should be cared.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to