poorbarcode commented on code in PR #20469: URL: https://github.com/apache/pulsar/pull/20469#discussion_r1226110018
########## pip/pip-269.md: ########## @@ -0,0 +1,239 @@ +# Background knowledge + +The Cursor is used to read data from ledgers, there are three scenarios for reading messages(we can call these scenarios Read-Scenario): +- **Sequentially Read**: reading some entries backward from the `read position` and updating the read position to the next position to be read when reading is finished. + - Read entries from `read position`(include), after the read is complete, set `read position` to the next position of the last entry of this read, then the next round of **Sequentially Read**. +- **Message Replay Read**: Normally consumers will acknowledge messages all they received, but sometimes the consumer can not handle messages and asks the broker to redeliver these messages(these messages are always earlier than `read position`). E.g. call `consumer.negativeAcknowledge`; close a consumer and there are some messages held by this consumer, these messages will be redelivered to other consumers. Message Dispatcher will cache the position of messages which should be redelivered, and Cursor will read messages which were cached in the memory of Message Dispatcher in the next round. +- **Specified Read**(Read directly using Managed Ledger): there are some APIs that can read messages at a specified position, but generally do not affect the attributes of the Cursor or Message Dispatcher. E.g. + - [`pulsar-admin topics examine-messages`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#examine-messages) + - [`pulsar-admin topics get-message-by-id`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#get-message-by-id) + +**Occasional Read Position Change**: As described above, the `read position` is very important to **Sequentially Read**, **Message Replay Read** expects the `read position` always be changed serialized by itself. There have some cases that will also change the `read position` of the Cursor, causing the Incorrect **Sequentially Read** and Incorrect **Message Replay Read**. For example: +- [pulsar-admin topics reset-cursor](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#reset-cursor): set `read position` to a specified value. +- [consumer.seek](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java#L482): same as [pulsar-admin topics reset-cursor](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#reset-cursor), but it used by the Pulsar Client. +- [pulsar-admin topics skip](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#skip-messages): skip many messages. +- [pulsar-admin topics clear-backlog](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#skip-all-messages) +- [cursor.rewind](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2477)<sup>[1]</sup>(it is an internal API): set the `read position` to the next of `mark deleted position.` +- [cursor.seek](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2493)(it is an internal API): just set the `read position` to a specified point, it is only used for Compaction now. +- Other read position edit: + - A compensation mechanism: Something wrong makes there a position the same as `read position` in the replay queue, https://github.com/apache/pulsar/issues/6403 provides an additional mechanism to increase the `read position` before reading to avoid repeated read(read by Message Replay Read once and read by Sequentially Read once). + +# Motivation + +If **Message Replay Read** and **Occasional Read Position Change** are executed concurrently, the consumption will be out of order. For example: +| time | **Occasional Read Position Change** | **Message Replay Read** | +| --- | --- | --- | +| 1 | | Start to read `[3:5, 3:6]` | +| 2 | | Waiting for the response of BK | +| 3 | Clear redelivery messages | +| 4 | Set read position to `3:0` | +| 5 | | Read complete, and send `[3:5, 3:6]` to the client | + +In this scenario(referred to as **Scenario-1**), the consumer will receive `[3:5, 3:6]` first and then receive `[3:0.....]` + +--- + +If **Sequentially Read** and **Occasional Read Position Change** are executed concurrently, the consumption will be out of order, also leading to consumption being stuck. For example: +| time | **Occasional Read Position Change** | **Sequentially Read** | +| --- | --- | --- | +| 1 | | Start to read `{readPosition=3:5, entriesNum=5}` | +| 2 | | Waiting for the response of BK | +| 3 | Clear redelivery messages | +| 4 | Set read position to `3:0` | +| 5 | | Read complete, set the read position to `3:7`, and send `[3:5 ~ 3:9]` to the client | + +In this scenario(referred to as **Scenario-2**), the consumer will receive `[3:5 ~ 3:9]` first and the messages `[3:0~3:4]` will not be delivered to the client until unloading the topic. + +--- + +In **Occasional Read Position Change** is executed, the attributes(including `read position`) of the Cursor will be changed(`action 1`)<sup>[2]</sup>, and the attributes of the Dispatcher(`action 2`) will be changed too<sup>[3]</sup>. If these two actions are not executed atomicity, this will cause the attributes of Cursorto to be inconsistent with the attributes of Disptacher, and cause some problems. For example: +| time | **Reset cursor** | **Consumer reconnect** | +| --- | --- | --- | +| 1 | disconnect all consumers | +| 2 | | remove consumer | +| 3 | | add consumer_1 | +| 4 | | rewind cursor to `3:0` | +| 5 | | add consumer_2 | +| 6 | | add consumer_2 to recently joined consumers, `max read position` is `3:0`<sup>[4]</sup> | +| 7 | set read position to `1:0` | + +In this scenario(referred to as **Scenario-3**), the `consumer_2` will be stuck by the mechanism Recently Joined Consumers of Key_Shared mode<sup>[4]</sup>. It's a simple question, but it's more complicated than that: +- If there has an in-flight reading. +- If there is more than one **Occasional Read Position Change** action running. +- Both above. + +--- + +In the Multi Consumer Dispatcher, there has a mechanism that tries to fix the issue caused by the action `cursor.rewind` and **Sequentially Read** executed concurrently(referred to as Rewind After In-flight Reading). It works like this: if there has an in-flight reading when calling `cursor.rewind`, just do rewind after the in-flight reading is finished.` but this mechanism has the following drawbacks: +- Makes a new issue lead to a new issue<sup>[5]</sup>. +- Only applicable to `cursor.rewind`. +- Only applicable to Multi Consumer Dispatcher. +- If there is more than one in-flight reading(one Sequentially Read and some Message Replay Read might exist simultaneously), this mechanism can not work. + +# Goals +- For the **Scenario-1** and **Scenario-2**(see Motivation), discard the in-flight reading(referred to as **Goal-1**). +- For the **Scenario-3**(see Motivation), we should do these two things: + - Provide a way to guarantee the atomicity modification of both attributes of Cursor and attributes of Dispatcher(referred to as **Goal-2**). + - Prevent more than one **Occasional Read Position Change** executed at the same time(Current Proposal does not consider this optimization, there will be some bug fixes after this PIP) +- Revert https://github.com/apache/pulsar/issues/6403 +- Remove the mechanism Rewind After In-flight Reading<sup>[5]</sup> + +# High-Level Design + +**For the Goal-1**: Cursor Epoch +- Definition of a counter to record how many times the `read position` of the cursor has been changed by the **Occasional Read Position Change**, we call it Cursor Epoch. **Occasional Read Position Change** executed once, the Cursor Epoch increase once. +- Every **Sequentially Read** or **Message Replay Read** carries the current Cursor Epoch. +- Discard the reading and prevent the `read position` change if the epoch carried by the reading is smaller than the current Cursor Epoch when the reading complete + +**For the Goal-2**: Two-phase of Cursor Epoch increment +- Mark the Cursor Epoch increase task is started. + - Set a new `read position`. + - Mark the cursor is modifying, at this moment, all the **Sequentially Read** and **Message Replay Read** will fail due to `CursorModifyingException`. +- We can change attributes of the Cursor or attributes of the Dispatcher now. +- End the Cursor Epoch increase task + - increase the epoch. + - remove the marker `modifying`. + +# Detailed Design + +**ManagedLedgerException.java** +```java +public static class CursorModifyingException extends ManagedLedgerException {}; + +public static class CursorEpochConflictException extends ManagedLedgerException {}; +``` + +<strong>ManagedCursorImpl.java</strong>(maybe instead `synchronized` to a `lock`) Review Comment: Good suggestion. already added a new component `CursorEpoch.java` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
