poorbarcode opened a new issue, #20277: URL: https://github.com/apache/pulsar/issues/20277
# Background knowledge & Motivation The Cursor is used to read data from ledgers, there are three scenarios for reading messages(we can call these scenarios Read-Scenario): - **Sequentially Read**: reading some entries backward from the `read position` and updating the read position to the next position to be read when reading is finished. - **Message Replay Read**: If some entries are asked for delivery again(such as a consumer is closed, but the messages it holds are not acknowledged), Cursor caches these positions in memory. It reads them by the specified positions which are cached in memory when other alive consumers need more entries. - **Specified Read**: The user uses an API like [`pulsar-admin topics examine-messages`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#examine-messages) to read the specified entry. There are these scenarios for updating the `read position` of the cursor(we can call these scenarios Position-Update-Scenario): - **Sequentially Position Update**: update when Sequentially Read is finished. - **Manual Position Update**: to reset the start position of the sequential read. - [`consumer.seek`](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java#L482) of pulsar client: set `read position` to a specified value. - [`pulsar-admin topics reset-cursor`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#reset-cursor): set `read position` to a specified value. - `cursor.rewind`(it is an internal API)<sup>[1]</sup>: set the `read position` to the next of `mark deleted position.` - **Additional Position Update**: There is a compensation mechanism<sup>[2]</sup> in the scenario Message Replay Read that modifies the `read position` of the cursor to avoid repeated read of `read position`. <strong>[1]</strong>: `cursor.rewind` is an internal API. It is used in these three scenarios: - When there have no consumers on the dispatcher of subscription, the dispatcher will give up all the states in the memory(such as messages which will be redelivery; the messages which are delay messages...) and reset the `read position` to `{mark deleted position} + 1`, just like this dispatcher is a new one. We can call this mechanism Dispatcher Reset. - The compression task reads all the data twice, first to find the key that needs to be retained and second to dump the data to a new ledger. After the first read is complete, it resets the read point by `rewind.` - The replication task reads messages of original cluster and sends them to the remote cluster. After the data is successfully sent, the replication task confirms these messages. If something is wrong, it relies on `rewind` to reset the read position to the `mark deleted position + 1` and restart the task. <strong>[2]</strong>: Something wrong makes there a position the same as `read position` in the replay queue, https://github.com/apache/pulsar/issues/6403 provides an additional mechanism to increase the `read position` before reading to avoid repeated read(read by Message Replay Read once and read by Sequentially Read once). # Motivation For the scenario **Sequentially Read**: Ideally, the `read position` of the cursor would be sequentially modified, guaranteeing the order of messages reading. But in fact, the `read position` will be modified concurrently, which makes message reading out of order. If these scenarios ran concurrently, it might make the `read position` go to an incorrect value. For example: | time | `thread-message-handle` | `thread-Sequentially Read` | | --- | --- | --- | | 1 | | read `5~10` complete(`mark delete position` is `4` and `read position is `11` now) | | 2 | | do callback | | 3 | try handle messages `5-10` | start a new read `11~20` | | 4 | Something wrong makes this processor can not process the message `5` | | 5 | Call `cursor.rewind` to process `5~10` again(`mark delete position` is `4` and `read position is `5` now) | | 6 | | The second read `11~20` is complete. Set `read position` to `21` | | 7 | | do callback | | 8 | Try to handle messages `11-20`. <strong>(Highlight)</strong>At this moment, the messages `5~10` is missed to process | For the issue above, there has a mechanism<sup>[3]</sup> to try to fix it, but it's not perfect enough to solve the problem. <strong>[3]</strong>: In the multi-consumer dispatcher, there has a marker, if Dispatcher Reset is triggered, this marker will be set to `true`. After completing the in-flight reading task, the dispatcher will discard these messages and trigger a new `rewind`. We can call the mechanism RewindIfReadingOutdated. But it can not cover everything, such as: - If there is more than one in-flight reading(one Sequentially Read and some Message Replay Read might exist simultaneously), this mechanism can not work. - There is no such mechanism in a single consumer dispatcher. - It only focuses on one scenario of **Manual Position Update**, it doesn't work for other scenes. # Goals - `read position` of the cursor is a field designed for **Sequentially Read**, which should only be changed in the scenario **Sequentially Read**. - If the `read position` is changed by **Manual Position Update**, the in-flight reading should be discarded and not change the `read position` anymore. - Disallow changing `read position` in the scenario **Specified Read**. The `read position` is - Add a marker that indicates how many times the `read position` of the cursor has been changed by the **Manual Position Update**. We can call the marker epoch of the cursor. - **Sequentially Read** carries the epoch. If the marker has been updated when the reading is complete, give up these entries reader, and do not update the `read position` anymore. ## In Scope Fixed the issue where sequential reads lost order due to concurrent modification of `read position`. ## Out of Scope nothing. # Design To improve the mechanism RewindIfReadingOutdated, we can make the marker a counter of in-flight reading instead of a boolean value, discard all the in-flight reading, and call `rewind` after the last in-flight reading is complete. Optimized again, we can use a Pointer to mark who is reading complete after **Manual Position Update**. If a read complete event is called after this Pointer, discard the entries and the action updating `read position`(instead of calling `rewind` of other additional `read position` resetting). ### summarize - Add a marker that indicates how many times the `read position` of the cursor has been changed by the **Manual Position Update**. We can call the marker epoch of the cursor. - **Sequentially Read** carries the epoch. If the marker has been updated when the reading is complete, give up these entries reader, and do not update the `read position` anymore. - Removes code which changed the `read position` existing in the scenario **Specified Read** ### Metrics <!-- For each metric provide: * Full name * Description * Attributes (labels) * Unit --> # Monitoring <!-- Describe how the changes you make in this proposal should be monitored. Don't describe the detailed metrics - they should be at "Public-facing Changes" / "Metrics" section Describe how the user will use the metrics to monitor the feature: Which alerts they should set up, which thresholds, ... --> # Backward & Forward Compatability Nothing should be cared. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
