poorbarcode commented on code in PR #20469: URL: https://github.com/apache/pulsar/pull/20469#discussion_r1241052256
########## pip/pip-269.md: ########## @@ -0,0 +1,242 @@ +# Background knowledge + +The Cursor is used to read data from ledgers, there are three scenarios for reading messages(we can call these scenarios Read-Scenario): +- **Sequential Read**: Read entries from `read position`(include), after the read is complete, set `read position` to the next position of the last entry of this read, then the next round of **Sequential Read**. Note: In addition to delivering messages to consumers, some internal apis are also using **Sequential Read**, such as `Compaction`, `TopicTransactionBufferRecover`, `Replicator` and so on. +- **Message Replay Read**: Normally consumers will acknowledge messages all they received, but sometimes the consumer can not handle messages and asks the broker to redeliver these messages(these messages are always earlier than `read position`). E.g. call `consumer.negativeAcknowledge`; close a consumer and there are some messages held by this consumer, these messages will be redelivered to other consumers. Message Dispatcher will cache the position of messages which should be redelivered, and Cursor will read messages which were cached in the memory of Message Dispatcher in the next round(we call this cache **Replay Queue**). In general, **Message Replay Read** does not modify the `read position`, except in the scenario **A compensation mechanism of Message Replay Read** below. +- **Direct read**(Read directly using Managed Ledger): there are some APIs that can read messages at a specified position, but generally do not affect the attributes of the Cursor or Message Dispatcher. E.g. + - [`pulsar-admin topics examine-messages`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#examine-messages) + - [`pulsar-admin topics get-message-by-id`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#get-message-by-id) + +Summary: The `read position` of the Cursor is a marker, reads above or equal to this marker are called **Sequential Read**, and reads below this marker are called **Message Replay Read**(these messages have been read once by **Sequential Read** before), and **Direct read** does not care about this marker(in fact, **Direct read** does not even care about Cursor). + +The Cursor also maintains the info of messages acknowledged(to avoid reading repeated). It uses two attributes to record which messages were acknowledged: +- `individual Acknowledged messages`(uncontinues acknowledgment). +- `mark deleted position`: The messages before this position were all acknowledged. The Cursor updates it when the first message is acknowledged. + +As described above, the `read position` and `mark deleted position` is very important to Cursor: +- **Sequential Read** expects the `read position` always be changed by itself. + +**Cursor Position Reset**: **Cursor Position Reset**: As described above, the `read position` and `mark deleted position` is very important to Cursor. `read position` is the watermark of reading, **Sequential Read** expects the `read position` always be changed by itself; `mark deleted position` is the watermark of acknowledgment, acknowledgment expects the `mark deleted position` always to changed by itself. There have some cases that will also change the `read position` and `mark deleted position`, causing the incorrect reading and incorrect changes of `mark deleted position`. For example: +- Reset the `mark deleted position` and `read position` to re-consume messages from a certain position(it also can use to skip the messages before a certain position) + - [Admin API: pulsar-admin topics reset-cursor](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#reset-cursor) + - [Client API: consumer.seek](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java#L482) +- Skip some messages(Manual moves the `mark deleted position` to a higher position, and possible to change `read position`): + - [Admin API: pulsar-admin topics skip](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#skip-messages): Skip a certain number of messages. + - [Admin API: pulsar-admin topics clear-backlog](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#skip-all-messages): Skip all messages. +- The internal components of Pulsar sometimes reset the `read position` to re-consume all unacknowledged messages(such as `Compactor`, `Replicator` and `Dispatcher`). + - [Internal API: cursor.rewind](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2477)<sup>[1]</sup> +- Since read compacted is a special mode of consumption, instead of "set `read position` to the next position of the last entry of this read," it calls [cursor.seek](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2493) to set `read position` to the next position that was not filtered out. + - [Internal API: cursor.seek](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2493) +- There is a [compensation mechanism](https://github.com/apache/pulsar/issues/6403) of **Message Replay Read**: for some unknown reason, the rule "reads below Read Position(not include) is called **Message Replay Read**" be broken, causing the message which equals `read position` was put into the **Replay Queue**, which leads to repeated consumption(read by Message Replay Read once and read by Sequential Read once). The [compensation mechanism](https://github.com/apache/pulsar/issues/6403) moves the `read position` of the Cursor to the next position to avoid repeated consumption(It did not fix the root cause, just did a backstop). + +# Motivation + +If **Message Replay Read** and **Cursor Position Reset** are executed concurrently, the consumption will be out of order. For example: +| time | **Cursor Position Reset** | **Message Replay Read** | +| --- | --- | --- | +| 1 | | Start to read `[3:5, 3:6]` | +| 2 | | Waiting for the response of BK | +| 3 | Clear redelivery messages | +| 4 | Set read position to `3:0` | +| 5 | | Read complete, and send `[3:5, 3:6]` to the client | + +In this scenario(referred to as **Scenario-1**), the consumer will receive `[3:5, 3:6]` first and then receive `[3:0.....]` + +--- + +If **Sequential Read** and **Cursor Position Reset** are executed concurrently, the consumption will be out of order, also leading to consumption being stuck. For example: +| time | **Cursor Position Reset** | **Sequential Read** | +| --- | --- | --- | +| 1 | | Start to read `{readPosition=3:5, entriesNum=5}` | +| 2 | | Waiting for the response of BK | +| 3 | Clear redelivery messages | +| 4 | Set read position to `3:0` | +| 5 | | Read complete, set the read position to `3:7`, and send `[3:5 ~ 3:9]` to the client | + +In this scenario(referred to as **Scenario-2**), the consumer will receive `[3:5 ~ 3:9]` first and the messages `[3:0~3:4]` will not be delivered to the client until unloading the topic. + +--- + +In **Cursor Position Reset** is executed, the attributes(including `read position`) of the Cursor will be changed(`action 1`)<sup>[2]</sup>, and the attributes of the Dispatcher(`action 2`) will be changed too<sup>[3]</sup>. If these two actions are not executed atomicity, this will cause the attributes of Cursor to to be inconsistent with the attributes of Disptacher, and cause some problems. For example: +| time | **Reset cursor** | **Consumer reconnect** | +| --- | --- | --- | +| 1 | disconnect all consumers | +| 2 | | remove consumer | +| 3 | | add consumer_1 | +| 4 | | rewind cursor to `3:0` | +| 5 | | add consumer_2 | +| 6 | | add consumer_2 to recently joined consumers, `max read position` is `3:0`<sup>[4]</sup> | +| 7 | set read position to `1:0` | + +In this scenario(referred to as **Scenario-3**), the `consumer_2` will be stuck by the mechanism Recently Joined Consumers of Key_Shared mode<sup>[4]</sup>. It's a simple question, but it's more complicated than that: +- If there has an in-flight reading. +- If there is more than one **Cursor Position Reset** action running. +- Both above. + +--- + +In the Multi Consumer Dispatcher, there has a mechanism that tries to fix the issue caused by the action `cursor.rewind` and **Sequential Read** executed concurrently(referred to as Rewind After In-flight Reading). It works like this: if there has an in-flight reading when calling `cursor.rewind`, just do rewind after the in-flight reading is finished.` but this mechanism has the following drawbacks: +- Makes a new issue lead to a new issue<sup>[5]</sup>. +- Only applicable to `cursor.rewind`. +- Only applicable to Multi Consumer Dispatcher. +- If there is more than one in-flight reading(one Sequential Read and some Message Replay Read might exist simultaneously), this mechanism can not work. + +# Goals +**Sub-Goal-1**: when a **Cursor Position Reset*** occurs, reject new reading, discard the in-flight reading and half in-flight reading<subp>[6]</sup>. And take care of these two things: +- Ensure that changes to `read position` and `mark deleted position` are thread-safe. +- Since more than one component uses Cursor to do **Sequential Read** or **Message Replay Read**, these components may also have their own attributes that need to be modified atomicity, we should also provide a mechanism to guarantee the atomicity modification of both attributes of Cursor and attributes of other components. +**Sub-Goal-2**: Remove some patchs(referred to as **Goal-3**): + - Revert https://github.com/apache/pulsar/issues/6403 (it named "A compensation mechanism of **Message Replay Read**" above) + - Remove the mechanism Rewind After In-flight Reading<sup>[5]</sup> +**Out of scope**: Prevent more than one **Cursor Position Reset** executed at the same time(Current Proposal does not consider this optimization, there will be some bug fixes after this PIP) + +# High-Level Design + +### New concepts +- **Cursor position is resetting**: indicate if there is a **Cursor Position Reset** running. Inject new readingg and discard in-flight reading if it is `true`. +- **Cursor Epoch**: a counter to record how many times the the **Cursor Position Reset** executed. If it is increased after a in-flight reading, discard this reading. + - This attribute can help to prevent such error: a reading is started(`${Cursor position is resetting}` is `false`); a **Cursor position is resetting** is executed; the reading is completed(`${Cursor position is resetting}` is `false`). + +### Guarantee safe changes +- Inside Cursor: the changes of attributes `read position`, `mark deleted position`, `cursor epoch`, and `cursor position is resetting` use the same lock. +- Outside Cursor + - The action **Cursor Position Reset*** update `${Cursor position is resetting}` first, which guarantees that no other **Cursor Position Reset*** events will reenter(fast fail), and new reading can be rejected (In other words, it is the same as a non-reentrant lock). + - Each component (such as Dispatcher, Replicator...) modifies its attributes(each component to decide whether it needs to a locked). + - End the change: revert `${Cursor position is resetting}`; update `read position` and `mark deleted position`; increase **Cursor Epoch**(help to prevent such error: a reading is started(`${Cursor Epoch}` is increased); a **Cursor position is resetting** is executed; the reading is completed). + +# Detailed Design + +**ManagedLedgerException.java** +```java +/** If a reading was injected by the event Cursor Position Reset, it will get a CursorModifyingException. **/ Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
