poorbarcode commented on code in PR #20469:
URL: https://github.com/apache/pulsar/pull/20469#discussion_r1223404806


##########
pip/pip-269.md:
##########
@@ -0,0 +1,239 @@
+# Background knowledge
+
+The Cursor is used to read data from ledgers, there are three scenarios for 
reading messages(we can call these scenarios Read-Scenario):
+- **Sequentially Read**: reading some entries backward from the `read 
position` and updating the read position to the next position to be read when 
reading is finished.
+    - Read entries from `read position`(include), after the read is complete, 
set `read position` to the next position of the last entry of this read, then 
the next round of **Sequentially Read**.
+- **Message Replay Read**: Normally consumers will acknowledge messages all 
they received, but sometimes the consumer can not handle messages and asks the 
broker to redeliver these messages(these messages are always earlier than `read 
position`). E.g. call `consumer.negativeAcknowledge`; close a consumer and 
there are some messages held by this consumer, these messages will be 
redelivered to other consumers. Message Dispatcher will cache the position of 
messages which should be redelivered, and Cursor will read messages which were 
cached in the memory of Message Dispatcher in the next round.
+- **Specified Read**(Read directly using Managed Ledger): there are some APIs 
that can read messages at a specified position, but generally do not affect the 
attributes of the Cursor or Message Dispatcher. E.g.
+    -  [`pulsar-admin topics 
examine-messages`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#examine-messages)
+    - [`pulsar-admin topics 
get-message-by-id`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#get-message-by-id)
+
+**Occasional Read Position Change**: As described above, the `read position` 
is very important to **Sequentially Read**, **Message Replay Read** expects the 
`read position` always be changed serialized by itself. There have some cases 
that will also change the `read position` of the Cursor, causing the Incorrect 
**Sequentially Read** and Incorrect **Message Replay Read**. For example:
+- [pulsar-admin topics 
reset-cursor](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#reset-cursor):
  set `read position` to a specified value.
+- 
[consumer.seek](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java#L482):
 same as [pulsar-admin topics 
reset-cursor](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#reset-cursor),
 but it used by the Pulsar Client.
+- [pulsar-admin topics 
skip](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#skip-messages): 
skip many messages.
+- [pulsar-admin topics 
clear-backlog](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#skip-all-messages)
+- 
[cursor.rewind](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2477)<sup>[1]</sup>(it
 is an internal API): set the `read position` to the next of `mark deleted 
position.`
+- 
[cursor.seek](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2493)(it
 is an internal API): just set the `read position` to a specified point, it is 
only used for Compaction now.
+- Other read position edit:
+    -  A compensation mechanism: Something wrong makes there a position the 
same as `read position` in the replay queue, 
https://github.com/apache/pulsar/issues/6403 provides an additional mechanism 
to increase the `read position` before reading to avoid repeated read(read by 
Message Replay Read once and read by Sequentially Read once).
+
+# Motivation
+
+If **Message Replay Read** and **Occasional Read Position Change** are 
executed concurrently, the consumption will be out of order. For example:
+| time | **Occasional Read Position Change** | **Message Replay Read** |
+| --- | --- | --- |
+| 1 | | Start to read `[3:5, 3:6]` |
+| 2 | | Waiting for the response of BK |
+| 3 | Clear redelivery messages |
+| 4 | Set read position to `3:0` |
+| 5 | | Read complete, and send `[3:5, 3:6]` to the client |
+
+In this scenario(referred to as **Scenario-1**), the consumer will receive 
`[3:5, 3:6]` first and then receive `[3:0.....]`
+
+---
+
+If **Sequentially Read** and **Occasional Read Position Change** are executed 
concurrently, the consumption will be out of order,  also leading to 
consumption being stuck. For example:
+| time | **Occasional Read Position Change** | **Sequentially Read** |
+| --- | --- | --- |
+| 1 | | Start to read `{readPosition=3:5, entriesNum=5}` |
+| 2 | | Waiting for the response of BK |
+| 3 | Clear redelivery messages |
+| 4 | Set read position to `3:0` |
+| 5 | | Read complete, set the read position to `3:7`, and send `[3:5 ~ 3:9]` 
to the client |
+
+In this scenario(referred to as **Scenario-2**), the consumer will receive 
`[3:5 ~ 3:9]` first and the messages `[3:0~3:4]` will not be delivered to the 
client until unloading the topic.
+
+---
+
+In **Occasional Read Position Change** is executed, the attributes(including 
`read position`) of the Cursor will be changed(`action 1`)<sup>[2]</sup>, and 
the attributes of the Dispatcher(`action 2`) will be changed too<sup>[3]</sup>. 
If these two actions are not executed atomicity, this will cause the attributes 
of Cursorto to be inconsistent with the attributes of Disptacher, and cause 
some problems. For example:
+| time | **Reset cursor** | **Consumer reconnect** |
+| --- | --- | --- |
+| 1 | disconnect all consumers |
+| 2 | | remove consumer |
+| 3 | | add consumer_1 |
+| 4 | | rewind cursor to `3:0` |
+| 5 | | add consumer_2 |
+| 6 | | add consumer_2 to recently joined consumers, `max read position` is 
`3:0`<sup>[4]</sup> |
+| 7 | set read position to `1:0` |
+
+In this scenario(referred to as **Scenario-3**), the `consumer_2` will be 
stuck by the mechanism Recently Joined Consumers of Key_Shared 
mode<sup>[4]</sup>. It's a simple question, but it's more complicated than that:
+- If there has an in-flight reading.
+- If there is more than one **Occasional Read Position Change** action running.
+- Both above.
+
+---
+
+In the Multi Consumer Dispatcher, there has a mechanism that tries to fix the 
issue caused by the action `cursor.rewind` and **Sequentially Read** executed 
concurrently(referred to as Rewind After In-flight Reading). It works like 
this: if there has an in-flight reading when calling `cursor.rewind`, just do 
rewind after the in-flight reading is finished.` but this mechanism has the 
following drawbacks:
+- Makes a new issue lead to a new issue<sup>[5]</sup>.
+- Only applicable to `cursor.rewind`.
+- Only applicable to Multi Consumer Dispatcher.
+- If there is more than one in-flight reading(one Sequentially Read and some 
Message Replay Read might exist simultaneously), this mechanism can not work.
+
+# Goals
+- For the **Scenario-1** and **Scenario-2**(see Motivation), discard the 
in-flight reading(referred to as **Goal-1**).
+- For the **Scenario-3**(see Motivation), we should do these two things:
+    - Provide a way to guarantee the atomicity modification of both attributes 
of Cursor and attributes of Dispatcher(referred to as **Goal-2**).
+    - Prevent more than one **Occasional Read Position Change** executed at 
the same time(Current Proposal does not consider this optimization, there will 
be some bug fixes after this PIP)
+- Revert https://github.com/apache/pulsar/issues/6403
+- Remove the mechanism Rewind After In-flight Reading<sup>[5]</sup>
+
+# High-Level Design
+
+**For the Goal-1**: Cursor Epoch
+- Definition of a counter to record how many times the `read position` of the 
cursor has been changed by the **Occasional Read Position Change**, we call it 
Cursor Epoch. **Occasional Read Position Change** executed once, the Cursor 
Epoch increase once.
+- Every **Sequentially Read** or **Message Replay Read** carries the current 
Cursor Epoch.
+- Discard the reading and prevent the `read position` change if the epoch 
carried by the reading is smaller than the current Cursor Epoch when the 
reading complete
+
+**For the Goal-2**: Two-phase of Cursor Epoch increment
+- Mark the Cursor Epoch increase task is started.
+    - Set a new `read position`.
+    - Mark the cursor is modifying, at this moment, all the **Sequentially 
Read** and **Message Replay Read** will fail due to `CursorModifyingException`.
+- We can change attributes of the Cursor or attributes of the Dispatcher now.
+- End the Cursor Epoch increase task
+    - increase the epoch.
+    - remove the marker `modifying`.
+
+# Detailed Design
+
+**ManagedLedgerException.java**
+```java
+public static class CursorModifyingException extends ManagedLedgerException {};
+
+public static class CursorEpochConflictException extends 
ManagedLedgerException {};
+```
+
+<strong>ManagedCursorImpl.java</strong>(maybe instead `synchronized` to a 
`lock`)
+```java
+/** Indicates how many times the `read position` of the cursor has been 
changed by the **Occasional Read Position Change** **/
+private volatile int epoch;
+/**
+ * 0: no task is running.
+ * 1: the task is running 
+ */
+private volatile int isEpochIncreaseRunning;
+
+/** Return current epoch, will not care about if there has a running epoch 
increase task. **/
+public int getEpoch();
+
+/**
+* Set `isEpochIncreaseRunning` to true and return the current epoch.
+* throws CursorEpochConflictException if there a running epoch increase task 
is in progress. 
+*/
+public synchronized int startIncreaseEpoch(Supplier<Position> 
newReadPositionCalcultor);
+
+/**
+ * Compare and set `isEpochIncreaseRunning` to `0` and return the 
`isEpochIncreaseRunning` before the update. If there is no epoch increment in 
progress, will not increase and just return `0`.
+ */
+public synchronized int endIncreaseEpoch();
+
+/** Only used for the scenario Sequentially Read, return true if set succeed. 
**/
+public synchronized boolean setReadPosition(Position newPosition, int 
expectedCursorEpoch);
+
+/** Return false if there is a running epoch increase task or epoch was 
changed. **/
+public boolean isEpochOutdated(int expectedSpoch);
+```
+
+**CursorStats.java**
+
+```java
+public int epoch;
+public int isEpochIncreaseRunning;
+```
+
+A demo of `rewind` in Key_shared mode:
+
+```java
+try {
+  cursor.startIncreaseEpoch(() ->  markDeletedPostion + 1);
+  dispatcher.clearDelayedMessageTracker();
+  dispatcher.clearRedeliveryMessages();
+  dispatcher.clearRecentJoinedConsumers();
+  if (cursor.endIncreaseEpoch() == 2 ) {
+    readMoreEntries(); // compensate read once.
+  }
+} catch (CursorEpochConflictException ex) {
+  // do retry or something else.
+}
+```
+
+A demo of **Sequentially Read**:
+
+```java
+public synchronized void readMoreEntries() {
+  ReadEntriesCallback cb = this;
+  int entriesCountToRead = 10;
+  int epoch = cursor.getEpoch();
+  cursor.asyncRead(entriesCountToRead, cb, epoch);
+}
+
+public final synchronized void readEntriesComplete(List<Entry> entries, Object 
ctx) {
+  int cursorEpochWhenStartRead = (int) ctx;
+  // Normally “readEntriesComplete” and **Occasional Read Position Change** 
are mutually exclusive.
+  if (cursor.isEpochOutdated(cursorEpochWhenStartRead)) {
+    entries.forEach(Entry::release);
+    return;
+  }
+  // send messages to client.
+}
+```
+
+### Metrics
+
+**pulsar_cursor_epoch_increase_count**
+- Description: how many times the `read position` of the cursor has been 
changed by the **Occasional Read Position Change**
+- type: Counter
+- labels: `[cluster, namespace, topic, subscription]`
+- unit: times
+
+**pulsar_cursor_epoch_increase_task_state**

Review Comment:
   There is already a counter that does this 
`pulsar_cursor_epoch_increase_count`, and this one just represents a state of 
if there is a running Read Position Reset. 
   
   If `pulsar_cursor_epoch_increase_task_running` is not 0 for a long time, it 
indicates that there is a bug that will cause consumption stuck.
   



##########
pip/pip-269.md:
##########
@@ -0,0 +1,239 @@
+# Background knowledge
+
+The Cursor is used to read data from ledgers, there are three scenarios for 
reading messages(we can call these scenarios Read-Scenario):
+- **Sequentially Read**: reading some entries backward from the `read 
position` and updating the read position to the next position to be read when 
reading is finished.
+    - Read entries from `read position`(include), after the read is complete, 
set `read position` to the next position of the last entry of this read, then 
the next round of **Sequentially Read**.
+- **Message Replay Read**: Normally consumers will acknowledge messages all 
they received, but sometimes the consumer can not handle messages and asks the 
broker to redeliver these messages(these messages are always earlier than `read 
position`). E.g. call `consumer.negativeAcknowledge`; close a consumer and 
there are some messages held by this consumer, these messages will be 
redelivered to other consumers. Message Dispatcher will cache the position of 
messages which should be redelivered, and Cursor will read messages which were 
cached in the memory of Message Dispatcher in the next round.
+- **Specified Read**(Read directly using Managed Ledger): there are some APIs 
that can read messages at a specified position, but generally do not affect the 
attributes of the Cursor or Message Dispatcher. E.g.
+    -  [`pulsar-admin topics 
examine-messages`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#examine-messages)
+    - [`pulsar-admin topics 
get-message-by-id`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#get-message-by-id)
+
+**Occasional Read Position Change**: As described above, the `read position` 
is very important to **Sequentially Read**, **Message Replay Read** expects the 
`read position` always be changed serialized by itself. There have some cases 
that will also change the `read position` of the Cursor, causing the Incorrect 
**Sequentially Read** and Incorrect **Message Replay Read**. For example:
+- [pulsar-admin topics 
reset-cursor](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#reset-cursor):
  set `read position` to a specified value.
+- 
[consumer.seek](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java#L482):
 same as [pulsar-admin topics 
reset-cursor](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#reset-cursor),
 but it used by the Pulsar Client.
+- [pulsar-admin topics 
skip](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#skip-messages): 
skip many messages.
+- [pulsar-admin topics 
clear-backlog](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#skip-all-messages)
+- 
[cursor.rewind](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2477)<sup>[1]</sup>(it
 is an internal API): set the `read position` to the next of `mark deleted 
position.`
+- 
[cursor.seek](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2493)(it
 is an internal API): just set the `read position` to a specified point, it is 
only used for Compaction now.
+- Other read position edit:
+    -  A compensation mechanism: Something wrong makes there a position the 
same as `read position` in the replay queue, 
https://github.com/apache/pulsar/issues/6403 provides an additional mechanism 
to increase the `read position` before reading to avoid repeated read(read by 
Message Replay Read once and read by Sequentially Read once).
+
+# Motivation
+
+If **Message Replay Read** and **Occasional Read Position Change** are 
executed concurrently, the consumption will be out of order. For example:
+| time | **Occasional Read Position Change** | **Message Replay Read** |
+| --- | --- | --- |
+| 1 | | Start to read `[3:5, 3:6]` |
+| 2 | | Waiting for the response of BK |
+| 3 | Clear redelivery messages |
+| 4 | Set read position to `3:0` |
+| 5 | | Read complete, and send `[3:5, 3:6]` to the client |
+
+In this scenario(referred to as **Scenario-1**), the consumer will receive 
`[3:5, 3:6]` first and then receive `[3:0.....]`
+
+---
+
+If **Sequentially Read** and **Occasional Read Position Change** are executed 
concurrently, the consumption will be out of order,  also leading to 
consumption being stuck. For example:
+| time | **Occasional Read Position Change** | **Sequentially Read** |
+| --- | --- | --- |
+| 1 | | Start to read `{readPosition=3:5, entriesNum=5}` |
+| 2 | | Waiting for the response of BK |
+| 3 | Clear redelivery messages |
+| 4 | Set read position to `3:0` |
+| 5 | | Read complete, set the read position to `3:7`, and send `[3:5 ~ 3:9]` 
to the client |
+
+In this scenario(referred to as **Scenario-2**), the consumer will receive 
`[3:5 ~ 3:9]` first and the messages `[3:0~3:4]` will not be delivered to the 
client until unloading the topic.
+
+---
+
+In **Occasional Read Position Change** is executed, the attributes(including 
`read position`) of the Cursor will be changed(`action 1`)<sup>[2]</sup>, and 
the attributes of the Dispatcher(`action 2`) will be changed too<sup>[3]</sup>. 
If these two actions are not executed atomicity, this will cause the attributes 
of Cursorto to be inconsistent with the attributes of Disptacher, and cause 
some problems. For example:
+| time | **Reset cursor** | **Consumer reconnect** |
+| --- | --- | --- |
+| 1 | disconnect all consumers |
+| 2 | | remove consumer |
+| 3 | | add consumer_1 |
+| 4 | | rewind cursor to `3:0` |
+| 5 | | add consumer_2 |
+| 6 | | add consumer_2 to recently joined consumers, `max read position` is 
`3:0`<sup>[4]</sup> |
+| 7 | set read position to `1:0` |
+
+In this scenario(referred to as **Scenario-3**), the `consumer_2` will be 
stuck by the mechanism Recently Joined Consumers of Key_Shared 
mode<sup>[4]</sup>. It's a simple question, but it's more complicated than that:
+- If there has an in-flight reading.
+- If there is more than one **Occasional Read Position Change** action running.
+- Both above.
+
+---
+
+In the Multi Consumer Dispatcher, there has a mechanism that tries to fix the 
issue caused by the action `cursor.rewind` and **Sequentially Read** executed 
concurrently(referred to as Rewind After In-flight Reading). It works like 
this: if there has an in-flight reading when calling `cursor.rewind`, just do 
rewind after the in-flight reading is finished.` but this mechanism has the 
following drawbacks:
+- Makes a new issue lead to a new issue<sup>[5]</sup>.
+- Only applicable to `cursor.rewind`.
+- Only applicable to Multi Consumer Dispatcher.
+- If there is more than one in-flight reading(one Sequentially Read and some 
Message Replay Read might exist simultaneously), this mechanism can not work.
+
+# Goals
+- For the **Scenario-1** and **Scenario-2**(see Motivation), discard the 
in-flight reading(referred to as **Goal-1**).
+- For the **Scenario-3**(see Motivation), we should do these two things:
+    - Provide a way to guarantee the atomicity modification of both attributes 
of Cursor and attributes of Dispatcher(referred to as **Goal-2**).
+    - Prevent more than one **Occasional Read Position Change** executed at 
the same time(Current Proposal does not consider this optimization, there will 
be some bug fixes after this PIP)
+- Revert https://github.com/apache/pulsar/issues/6403
+- Remove the mechanism Rewind After In-flight Reading<sup>[5]</sup>
+
+# High-Level Design
+
+**For the Goal-1**: Cursor Epoch
+- Definition of a counter to record how many times the `read position` of the 
cursor has been changed by the **Occasional Read Position Change**, we call it 
Cursor Epoch. **Occasional Read Position Change** executed once, the Cursor 
Epoch increase once.
+- Every **Sequentially Read** or **Message Replay Read** carries the current 
Cursor Epoch.
+- Discard the reading and prevent the `read position` change if the epoch 
carried by the reading is smaller than the current Cursor Epoch when the 
reading complete
+
+**For the Goal-2**: Two-phase of Cursor Epoch increment
+- Mark the Cursor Epoch increase task is started.
+    - Set a new `read position`.
+    - Mark the cursor is modifying, at this moment, all the **Sequentially 
Read** and **Message Replay Read** will fail due to `CursorModifyingException`.
+- We can change attributes of the Cursor or attributes of the Dispatcher now.
+- End the Cursor Epoch increase task
+    - increase the epoch.
+    - remove the marker `modifying`.
+
+# Detailed Design
+
+**ManagedLedgerException.java**
+```java
+public static class CursorModifyingException extends ManagedLedgerException {};
+
+public static class CursorEpochConflictException extends 
ManagedLedgerException {};
+```
+
+<strong>ManagedCursorImpl.java</strong>(maybe instead `synchronized` to a 
`lock`)
+```java
+/** Indicates how many times the `read position` of the cursor has been 
changed by the **Occasional Read Position Change** **/
+private volatile int epoch;
+/**
+ * 0: no task is running.
+ * 1: the task is running 
+ */
+private volatile int isEpochIncreaseRunning;
+
+/** Return current epoch, will not care about if there has a running epoch 
increase task. **/
+public int getEpoch();
+
+/**
+* Set `isEpochIncreaseRunning` to true and return the current epoch.
+* throws CursorEpochConflictException if there a running epoch increase task 
is in progress. 
+*/
+public synchronized int startIncreaseEpoch(Supplier<Position> 
newReadPositionCalcultor);
+
+/**
+ * Compare and set `isEpochIncreaseRunning` to `0` and return the 
`isEpochIncreaseRunning` before the update. If there is no epoch increment in 
progress, will not increase and just return `0`.
+ */
+public synchronized int endIncreaseEpoch();
+
+/** Only used for the scenario Sequentially Read, return true if set succeed. 
**/
+public synchronized boolean setReadPosition(Position newPosition, int 
expectedCursorEpoch);
+
+/** Return false if there is a running epoch increase task or epoch was 
changed. **/
+public boolean isEpochOutdated(int expectedSpoch);
+```
+
+**CursorStats.java**
+
+```java
+public int epoch;
+public int isEpochIncreaseRunning;
+```
+
+A demo of `rewind` in Key_shared mode:
+
+```java
+try {
+  cursor.startIncreaseEpoch(() ->  markDeletedPostion + 1);
+  dispatcher.clearDelayedMessageTracker();
+  dispatcher.clearRedeliveryMessages();
+  dispatcher.clearRecentJoinedConsumers();
+  if (cursor.endIncreaseEpoch() == 2 ) {
+    readMoreEntries(); // compensate read once.
+  }
+} catch (CursorEpochConflictException ex) {
+  // do retry or something else.
+}
+```
+
+A demo of **Sequentially Read**:
+
+```java
+public synchronized void readMoreEntries() {
+  ReadEntriesCallback cb = this;
+  int entriesCountToRead = 10;
+  int epoch = cursor.getEpoch();
+  cursor.asyncRead(entriesCountToRead, cb, epoch);
+}
+
+public final synchronized void readEntriesComplete(List<Entry> entries, Object 
ctx) {
+  int cursorEpochWhenStartRead = (int) ctx;
+  // Normally “readEntriesComplete” and **Occasional Read Position Change** 
are mutually exclusive.
+  if (cursor.isEpochOutdated(cursorEpochWhenStartRead)) {
+    entries.forEach(Entry::release);
+    return;
+  }
+  // send messages to client.
+}
+```
+
+### Metrics
+
+**pulsar_cursor_epoch_increase_count**
+- Description: how many times the `read position` of the cursor has been 
changed by the **Occasional Read Position Change**
+- type: Counter
+- labels: `[cluster, namespace, topic, subscription]`
+- unit: times
+
+**pulsar_cursor_epoch_increase_task_state**

Review Comment:
   There is already a counter that does this 
`pulsar_cursor_epoch_increase_count`, and this one just represents a state of 
if there is a running Read Position Reset. 
   
   If `pulsar_cursor_epoch_increase_task_running` is `true` for a long time, it 
indicates that there is a bug that will cause consumption stuck.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to