poorbarcode commented on code in PR #20469:
URL: https://github.com/apache/pulsar/pull/20469#discussion_r1241052256


##########
pip/pip-269.md:
##########
@@ -0,0 +1,242 @@
+# Background knowledge
+
+The Cursor is used to read data from ledgers, there are three scenarios for 
reading messages(we can call these scenarios Read-Scenario):
+- **Sequential Read**: Read entries from `read position`(include), after the 
read is complete, set `read position` to the next position of the last entry of 
this read, then the next round of **Sequential Read**. Note: In addition to 
delivering messages to consumers, some internal apis are also using 
**Sequential Read**, such as `Compaction`, `TopicTransactionBufferRecover`, 
`Replicator` and so on.
+- **Message Replay Read**: Normally consumers will acknowledge messages all 
they received, but sometimes the consumer can not handle messages and asks the 
broker to redeliver these messages(these messages are always earlier than `read 
position`). E.g. call `consumer.negativeAcknowledge`; close a consumer and 
there are some messages held by this consumer, these messages will be 
redelivered to other consumers. Message Dispatcher will cache the position of 
messages which should be redelivered, and Cursor will read messages which were 
cached in the memory of Message Dispatcher in the next round(we call this cache 
**Replay Queue**). In general, **Message Replay Read** does not modify the 
`read position`, except in the scenario **A compensation mechanism of Message 
Replay Read** below.
+- **Direct read**(Read directly using Managed Ledger): there are some APIs 
that can read messages at a specified position, but generally do not affect the 
attributes of the Cursor or Message Dispatcher. E.g.
+  - [`pulsar-admin topics 
examine-messages`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#examine-messages)
+  - [`pulsar-admin topics 
get-message-by-id`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#get-message-by-id)
+
+Summary: The `read position` of the Cursor is a marker, reads above or equal 
to this marker are called **Sequential Read**, and reads below this marker are 
called **Message Replay Read**(these messages have been read once by 
**Sequential Read** before), and **Direct read** does not care about this 
marker(in fact, **Direct read** does not even care about Cursor).
+
+The Cursor also maintains the info of messages acknowledged(to avoid reading 
repeated). It uses two attributes to record which messages were acknowledged:
+- `individual Acknowledged messages`(uncontinues acknowledgment).
+- `mark deleted position`: The messages before this position were all 
acknowledged. The Cursor updates it when the first message is acknowledged.
+
+As described above, the `read position` and `mark deleted position` is very 
important to Cursor:
+- **Sequential Read** expects the `read position` always be changed by itself. 
+
+**Cursor Position Reset**: **Cursor Position Reset**: As described above, the 
`read position` and `mark deleted position` is very important to Cursor. `read 
position` is the watermark of reading, **Sequential Read** expects the `read 
position` always be changed by itself; `mark deleted position` is the watermark 
of acknowledgment, acknowledgment expects the `mark deleted position` always to 
changed by itself. There have some cases that will also change the `read 
position` and `mark deleted position`, causing the incorrect reading and 
incorrect changes of `mark deleted position`. For example:
+- Reset the `mark deleted position` and `read position` to re-consume messages 
from a certain position(it also can use to skip the messages before a certain 
position)
+  - [Admin API: pulsar-admin topics 
reset-cursor](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#reset-cursor)
+  - [Client API: 
consumer.seek](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java#L482)
+- Skip some messages(Manual moves the `mark deleted position` to a higher 
position, and possible to change `read position`): 
+  - [Admin API: pulsar-admin topics 
skip](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#skip-messages): 
Skip a certain number of messages.
+  - [Admin API: pulsar-admin topics 
clear-backlog](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#skip-all-messages):
 Skip all messages.
+- The internal components of Pulsar sometimes reset the `read position` to 
re-consume all unacknowledged messages(such as `Compactor`, `Replicator` and 
`Dispatcher`).
+  - [Internal API: 
cursor.rewind](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2477)<sup>[1]</sup>
+- Since read compacted is a special mode of consumption, instead of "set `read 
position` to the next position of the last entry of this read," it calls 
[cursor.seek](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2493)
 to set `read position` to the next position that was not filtered out.
+  - [Internal API: 
cursor.seek](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2493)
+- There is a [compensation 
mechanism](https://github.com/apache/pulsar/issues/6403) of **Message Replay 
Read**: for some unknown reason, the rule "reads below Read Position(not 
include) is called **Message Replay Read**" be broken, causing the message 
which equals `read position` was put into the **Replay Queue**, which leads to 
repeated consumption(read by Message Replay Read once and read by Sequential 
Read once). The [compensation 
mechanism](https://github.com/apache/pulsar/issues/6403) moves the `read 
position` of the Cursor to the next position to avoid repeated consumption(It 
did not fix the root cause, just did a backstop).
+
+# Motivation
+
+If **Message Replay Read** and **Cursor Position Reset** are executed 
concurrently, the consumption will be out of order. For example:
+| time | **Cursor Position Reset** | **Message Replay Read** |
+| --- | --- | --- |
+| 1 | | Start to read `[3:5, 3:6]` |
+| 2 | | Waiting for the response of BK |
+| 3 | Clear redelivery messages |
+| 4 | Set read position to `3:0` |
+| 5 | | Read complete, and send `[3:5, 3:6]` to the client |
+
+In this scenario(referred to as **Scenario-1**), the consumer will receive 
`[3:5, 3:6]` first and then receive `[3:0.....]`
+
+---
+
+If **Sequential Read** and **Cursor Position Reset** are executed 
concurrently, the consumption will be out of order,  also leading to 
consumption being stuck. For example:
+| time | **Cursor Position Reset** | **Sequential Read** |
+| --- | --- | --- |
+| 1 | | Start to read `{readPosition=3:5, entriesNum=5}` |
+| 2 | | Waiting for the response of BK |
+| 3 | Clear redelivery messages |
+| 4 | Set read position to `3:0` |
+| 5 | | Read complete, set the read position to `3:7`, and send `[3:5 ~ 3:9]` 
to the client |
+
+In this scenario(referred to as **Scenario-2**), the consumer will receive 
`[3:5 ~ 3:9]` first and the messages `[3:0~3:4]` will not be delivered to the 
client until unloading the topic.
+
+---
+
+In **Cursor Position Reset** is executed, the attributes(including `read 
position`) of the Cursor will be changed(`action 1`)<sup>[2]</sup>, and the 
attributes of the Dispatcher(`action 2`) will be changed too<sup>[3]</sup>. If 
these two actions are not executed atomicity, this will cause the attributes of 
Cursor to to be inconsistent with the attributes of Disptacher, and cause some 
problems. For example:
+| time | **Reset cursor** | **Consumer reconnect** |
+| --- | --- | --- |
+| 1 | disconnect all consumers |
+| 2 | | remove consumer |
+| 3 | | add consumer_1 |
+| 4 | | rewind cursor to `3:0` |
+| 5 | | add consumer_2 |
+| 6 | | add consumer_2 to recently joined consumers, `max read position` is 
`3:0`<sup>[4]</sup> |
+| 7 | set read position to `1:0` |
+
+In this scenario(referred to as **Scenario-3**), the `consumer_2` will be 
stuck by the mechanism Recently Joined Consumers of Key_Shared 
mode<sup>[4]</sup>. It's a simple question, but it's more complicated than that:
+- If there has an in-flight reading.
+- If there is more than one **Cursor Position Reset** action running.
+- Both above.
+
+---
+
+In the Multi Consumer Dispatcher, there has a mechanism that tries to fix the 
issue caused by the action `cursor.rewind` and **Sequential Read** executed 
concurrently(referred to as Rewind After In-flight Reading). It works like 
this: if there has an in-flight reading when calling `cursor.rewind`, just do 
rewind after the in-flight reading is finished.` but this mechanism has the 
following drawbacks:
+- Makes a new issue lead to a new issue<sup>[5]</sup>.
+- Only applicable to `cursor.rewind`.
+- Only applicable to Multi Consumer Dispatcher.
+- If there is more than one in-flight reading(one Sequential Read and some 
Message Replay Read might exist simultaneously), this mechanism can not work.
+
+# Goals
+**Sub-Goal-1**: when a **Cursor Position Reset*** occurs, reject new reading, 
discard the in-flight reading and half in-flight reading<subp>[6]</sup>. And 
take care of these two things:
+- Ensure that changes to `read position` and `mark deleted position` are 
thread-safe.
+- Since more than one component uses Cursor to do **Sequential Read** or 
**Message Replay Read**, these components may also have their own attributes 
that need to be modified atomicity, we should also provide a mechanism to 
guarantee the atomicity modification of both attributes of Cursor and 
attributes of other components. 
+**Sub-Goal-2**: Remove some patchs(referred to as **Goal-3**):
+  - Revert https://github.com/apache/pulsar/issues/6403 (it named "A 
compensation mechanism of **Message Replay Read**" above)
+  - Remove the mechanism Rewind After In-flight Reading<sup>[5]</sup>
+**Out of scope**: Prevent more than one **Cursor Position Reset** executed at 
the same time(Current Proposal does not consider this optimization, there will 
be some bug fixes after this PIP)
+
+# High-Level Design
+
+### New concepts
+- **Cursor position is resetting**: indicate if there is a **Cursor Position 
Reset** running. Inject new readingg and discard in-flight reading if it is 
`true`.
+- **Cursor Epoch**: a counter to record how many times the the **Cursor 
Position Reset** executed. If it is increased after a in-flight reading, 
discard this reading.
+  - This attribute can help to prevent such error: a reading is 
started(`${Cursor position is resetting}` is `false`); a **Cursor position is 
resetting** is executed; the reading is completed(`${Cursor position is 
resetting}` is `false`).
+
+### Guarantee safe changes
+- Inside Cursor: the changes of attributes `read position`, `mark deleted 
position`, `cursor epoch`, and `cursor position is resetting` use the same lock.
+- Outside Cursor
+  - The action **Cursor Position Reset*** update `${Cursor position is 
resetting}` first, which guarantees that no other **Cursor Position Reset*** 
events will reenter(fast fail), and new reading can be rejected (In other 
words, it is the same as a non-reentrant lock).
+  - Each component (such as Dispatcher, Replicator...) modifies its 
attributes(each component to decide whether it needs to a locked).
+  - End the change: revert `${Cursor position is resetting}`; update `read 
position` and `mark deleted position`; increase **Cursor Epoch**(help to 
prevent such error: a reading is started(`${Cursor Epoch}` is increased); a 
**Cursor position is resetting** is executed; the reading is completed).
+  
+# Detailed Design
+
+**ManagedLedgerException.java**
+```java
+/** If a reading was injected by the event Cursor Position Reset, it will get 
a CursorModifyingException. **/

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to