poorbarcode commented on code in PR #20469:
URL: https://github.com/apache/pulsar/pull/20469#discussion_r1226094754


##########
pip/pip-269.md:
##########
@@ -0,0 +1,239 @@
+# Background knowledge
+
+The Cursor is used to read data from ledgers, there are three scenarios for 
reading messages(we can call these scenarios Read-Scenario):
+- **Sequentially Read**: reading some entries backward from the `read 
position` and updating the read position to the next position to be read when 
reading is finished.
+    - Read entries from `read position`(include), after the read is complete, 
set `read position` to the next position of the last entry of this read, then 
the next round of **Sequentially Read**.
+- **Message Replay Read**: Normally consumers will acknowledge messages all 
they received, but sometimes the consumer can not handle messages and asks the 
broker to redeliver these messages(these messages are always earlier than `read 
position`). E.g. call `consumer.negativeAcknowledge`; close a consumer and 
there are some messages held by this consumer, these messages will be 
redelivered to other consumers. Message Dispatcher will cache the position of 
messages which should be redelivered, and Cursor will read messages which were 
cached in the memory of Message Dispatcher in the next round.
+- **Specified Read**(Read directly using Managed Ledger): there are some APIs 
that can read messages at a specified position, but generally do not affect the 
attributes of the Cursor or Message Dispatcher. E.g.
+    -  [`pulsar-admin topics 
examine-messages`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#examine-messages)
+    - [`pulsar-admin topics 
get-message-by-id`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#get-message-by-id)
+
+**Occasional Read Position Change**: As described above, the `read position` 
is very important to **Sequentially Read**, **Message Replay Read** expects the 
`read position` always be changed serialized by itself. There have some cases 
that will also change the `read position` of the Cursor, causing the Incorrect 
**Sequentially Read** and Incorrect **Message Replay Read**. For example:
+- [pulsar-admin topics 
reset-cursor](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#reset-cursor):
  set `read position` to a specified value.
+- 
[consumer.seek](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java#L482):
 same as [pulsar-admin topics 
reset-cursor](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#reset-cursor),
 but it used by the Pulsar Client.
+- [pulsar-admin topics 
skip](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#skip-messages): 
skip many messages.
+- [pulsar-admin topics 
clear-backlog](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#skip-all-messages)
+- 
[cursor.rewind](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2477)<sup>[1]</sup>(it
 is an internal API): set the `read position` to the next of `mark deleted 
position.`
+- 
[cursor.seek](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2493)(it
 is an internal API): just set the `read position` to a specified point, it is 
only used for Compaction now.
+- Other read position edit:
+    -  A compensation mechanism: Something wrong makes there a position the 
same as `read position` in the replay queue, 
https://github.com/apache/pulsar/issues/6403 provides an additional mechanism 
to increase the `read position` before reading to avoid repeated read(read by 
Message Replay Read once and read by Sequentially Read once).
+
+# Motivation
+
+If **Message Replay Read** and **Occasional Read Position Change** are 
executed concurrently, the consumption will be out of order. For example:
+| time | **Occasional Read Position Change** | **Message Replay Read** |
+| --- | --- | --- |
+| 1 | | Start to read `[3:5, 3:6]` |
+| 2 | | Waiting for the response of BK |
+| 3 | Clear redelivery messages |
+| 4 | Set read position to `3:0` |
+| 5 | | Read complete, and send `[3:5, 3:6]` to the client |
+
+In this scenario(referred to as **Scenario-1**), the consumer will receive 
`[3:5, 3:6]` first and then receive `[3:0.....]`
+
+---
+
+If **Sequentially Read** and **Occasional Read Position Change** are executed 
concurrently, the consumption will be out of order,  also leading to 
consumption being stuck. For example:
+| time | **Occasional Read Position Change** | **Sequentially Read** |
+| --- | --- | --- |
+| 1 | | Start to read `{readPosition=3:5, entriesNum=5}` |
+| 2 | | Waiting for the response of BK |
+| 3 | Clear redelivery messages |
+| 4 | Set read position to `3:0` |
+| 5 | | Read complete, set the read position to `3:7`, and send `[3:5 ~ 3:9]` 
to the client |
+
+In this scenario(referred to as **Scenario-2**), the consumer will receive 
`[3:5 ~ 3:9]` first and the messages `[3:0~3:4]` will not be delivered to the 
client until unloading the topic.
+
+---
+
+In **Occasional Read Position Change** is executed, the attributes(including 
`read position`) of the Cursor will be changed(`action 1`)<sup>[2]</sup>, and 
the attributes of the Dispatcher(`action 2`) will be changed too<sup>[3]</sup>. 
If these two actions are not executed atomicity, this will cause the attributes 
of Cursorto to be inconsistent with the attributes of Disptacher, and cause 
some problems. For example:
+| time | **Reset cursor** | **Consumer reconnect** |
+| --- | --- | --- |
+| 1 | disconnect all consumers |
+| 2 | | remove consumer |
+| 3 | | add consumer_1 |
+| 4 | | rewind cursor to `3:0` |
+| 5 | | add consumer_2 |
+| 6 | | add consumer_2 to recently joined consumers, `max read position` is 
`3:0`<sup>[4]</sup> |
+| 7 | set read position to `1:0` |
+
+In this scenario(referred to as **Scenario-3**), the `consumer_2` will be 
stuck by the mechanism Recently Joined Consumers of Key_Shared 
mode<sup>[4]</sup>. It's a simple question, but it's more complicated than that:
+- If there has an in-flight reading.
+- If there is more than one **Occasional Read Position Change** action running.
+- Both above.
+
+---
+
+In the Multi Consumer Dispatcher, there has a mechanism that tries to fix the 
issue caused by the action `cursor.rewind` and **Sequentially Read** executed 
concurrently(referred to as Rewind After In-flight Reading). It works like 
this: if there has an in-flight reading when calling `cursor.rewind`, just do 
rewind after the in-flight reading is finished.` but this mechanism has the 
following drawbacks:
+- Makes a new issue lead to a new issue<sup>[5]</sup>.
+- Only applicable to `cursor.rewind`.
+- Only applicable to Multi Consumer Dispatcher.
+- If there is more than one in-flight reading(one Sequentially Read and some 
Message Replay Read might exist simultaneously), this mechanism can not work.
+
+# Goals
+- For the **Scenario-1** and **Scenario-2**(see Motivation), discard the 
in-flight reading(referred to as **Goal-1**).
+- For the **Scenario-3**(see Motivation), we should do these two things:
+    - Provide a way to guarantee the atomicity modification of both attributes 
of Cursor and attributes of Dispatcher(referred to as **Goal-2**).
+    - Prevent more than one **Occasional Read Position Change** executed at 
the same time(Current Proposal does not consider this optimization, there will 
be some bug fixes after this PIP)
+- Revert https://github.com/apache/pulsar/issues/6403
+- Remove the mechanism Rewind After In-flight Reading<sup>[5]</sup>
+
+# High-Level Design
+
+**For the Goal-1**: Cursor Epoch

Review Comment:
   > Tell me if I'm correct
   
   Correct.
   
   > Rename variable. Epoch in computer science is mostly for how many seconds 
have passed since 1970-01-01. See 
https://en.wikipedia.org/wiki/Epoch_(computing)
   
   Pulsar and Zookeeper have similar concepts named epoch, so `epoch` may be 
easier to understand? I have also corrected the document: `epoch` is also 
related to the change of `mark deleted position`(in other words, if the `mark 
deleted position` is changed by Cursor Reset, the `epoch` also increases)
   
   > Checking count has increased and if not update read position is not fully 
safe. Better guard update read position with a lock, since you're taking an 
action based on the count. I would take a lock on read position, and in that 
lock update count if needed and read position if needed.
   
   Yes, this is consistent with the current design, the original document may 
not be clearly enough, I improved it
   
   > I wouldn't use an exception for flow control. It's an expensive operation, 
especially for latency sensitive systems. Return value like Result of some sort 
- can we do that?
   
   Good suggestion, already instead of `return -1`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to