asafm commented on code in PR #20469: URL: https://github.com/apache/pulsar/pull/20469#discussion_r1217038671
########## pip/pip-269.md: ########## @@ -0,0 +1,239 @@ +# Background knowledge + +The Cursor is used to read data from ledgers, there are three scenarios for reading messages(we can call these scenarios Read-Scenario): +- **Sequentially Read**: reading some entries backward from the `read position` and updating the read position to the next position to be read when reading is finished. Review Comment: Why backward? Reading sequentially is forward ########## pip/pip-269.md: ########## @@ -0,0 +1,239 @@ +# Background knowledge + +The Cursor is used to read data from ledgers, there are three scenarios for reading messages(we can call these scenarios Read-Scenario): +- **Sequentially Read**: reading some entries backward from the `read position` and updating the read position to the next position to be read when reading is finished. + - Read entries from `read position`(include), after the read is complete, set `read position` to the next position of the last entry of this read, then the next round of **Sequentially Read**. +- **Message Replay Read**: Normally consumers will acknowledge messages all they received, but sometimes the consumer can not handle messages and asks the broker to redeliver these messages(these messages are always earlier than `read position`). E.g. call `consumer.negativeAcknowledge`; close a consumer and there are some messages held by this consumer, these messages will be redelivered to other consumers. Message Dispatcher will cache the position of messages which should be redelivered, and Cursor will read messages which were cached in the memory of Message Dispatcher in the next round. +- **Specified Read**(Read directly using Managed Ledger): there are some APIs that can read messages at a specified position, but generally do not affect the attributes of the Cursor or Message Dispatcher. E.g. + - [`pulsar-admin topics examine-messages`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#examine-messages) + - [`pulsar-admin topics get-message-by-id`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#get-message-by-id) + +**Occasional Read Position Change**: As described above, the `read position` is very important to **Sequentially Read**, **Message Replay Read** expects the `read position` always be changed serialized by itself. There have some cases that will also change the `read position` of the Cursor, causing the Incorrect **Sequentially Read** and Incorrect **Message Replay Read**. For example: +- [pulsar-admin topics reset-cursor](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#reset-cursor): set `read position` to a specified value. +- [consumer.seek](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java#L482): same as [pulsar-admin topics reset-cursor](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#reset-cursor), but it used by the Pulsar Client. +- [pulsar-admin topics skip](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#skip-messages): skip many messages. +- [pulsar-admin topics clear-backlog](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#skip-all-messages) +- [cursor.rewind](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2477)<sup>[1]</sup>(it is an internal API): set the `read position` to the next of `mark deleted position.` +- [cursor.seek](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2493)(it is an internal API): just set the `read position` to a specified point, it is only used for Compaction now. +- Other read position edit: + - A compensation mechanism: Something wrong makes there a position the same as `read position` in the replay queue, https://github.com/apache/pulsar/issues/6403 provides an additional mechanism to increase the `read position` before reading to avoid repeated read(read by Message Replay Read once and read by Sequentially Read once). + +# Motivation + +If **Message Replay Read** and **Occasional Read Position Change** are executed concurrently, the consumption will be out of order. For example: +| time | **Occasional Read Position Change** | **Message Replay Read** | +| --- | --- | --- | +| 1 | | Start to read `[3:5, 3:6]` | +| 2 | | Waiting for the response of BK | +| 3 | Clear redelivery messages | +| 4 | Set read position to `3:0` | +| 5 | | Read complete, and send `[3:5, 3:6]` to the client | + +In this scenario(referred to as **Scenario-1**), the consumer will receive `[3:5, 3:6]` first and then receive `[3:0.....]` + +--- + +If **Sequentially Read** and **Occasional Read Position Change** are executed concurrently, the consumption will be out of order, also leading to consumption being stuck. For example: +| time | **Occasional Read Position Change** | **Sequentially Read** | +| --- | --- | --- | +| 1 | | Start to read `{readPosition=3:5, entriesNum=5}` | +| 2 | | Waiting for the response of BK | +| 3 | Clear redelivery messages | +| 4 | Set read position to `3:0` | +| 5 | | Read complete, set the read position to `3:7`, and send `[3:5 ~ 3:9]` to the client | + +In this scenario(referred to as **Scenario-2**), the consumer will receive `[3:5 ~ 3:9]` first and the messages `[3:0~3:4]` will not be delivered to the client until unloading the topic. + +--- + +In **Occasional Read Position Change** is executed, the attributes(including `read position`) of the Cursor will be changed(`action 1`)<sup>[2]</sup>, and the attributes of the Dispatcher(`action 2`) will be changed too<sup>[3]</sup>. If these two actions are not executed atomicity, this will cause the attributes of Cursorto to be inconsistent with the attributes of Disptacher, and cause some problems. For example: +| time | **Reset cursor** | **Consumer reconnect** | +| --- | --- | --- | +| 1 | disconnect all consumers | +| 2 | | remove consumer | +| 3 | | add consumer_1 | +| 4 | | rewind cursor to `3:0` | +| 5 | | add consumer_2 | +| 6 | | add consumer_2 to recently joined consumers, `max read position` is `3:0`<sup>[4]</sup> | +| 7 | set read position to `1:0` | + +In this scenario(referred to as **Scenario-3**), the `consumer_2` will be stuck by the mechanism Recently Joined Consumers of Key_Shared mode<sup>[4]</sup>. It's a simple question, but it's more complicated than that: +- If there has an in-flight reading. +- If there is more than one **Occasional Read Position Change** action running. +- Both above. + +--- + +In the Multi Consumer Dispatcher, there has a mechanism that tries to fix the issue caused by the action `cursor.rewind` and **Sequentially Read** executed concurrently(referred to as Rewind After In-flight Reading). It works like this: if there has an in-flight reading when calling `cursor.rewind`, just do rewind after the in-flight reading is finished.` but this mechanism has the following drawbacks: +- Makes a new issue lead to a new issue<sup>[5]</sup>. +- Only applicable to `cursor.rewind`. +- Only applicable to Multi Consumer Dispatcher. +- If there is more than one in-flight reading(one Sequentially Read and some Message Replay Read might exist simultaneously), this mechanism can not work. + +# Goals +- For the **Scenario-1** and **Scenario-2**(see Motivation), discard the in-flight reading(referred to as **Goal-1**). +- For the **Scenario-3**(see Motivation), we should do these two things: + - Provide a way to guarantee the atomicity modification of both attributes of Cursor and attributes of Dispatcher(referred to as **Goal-2**). + - Prevent more than one **Occasional Read Position Change** executed at the same time(Current Proposal does not consider this optimization, there will be some bug fixes after this PIP) +- Revert https://github.com/apache/pulsar/issues/6403 +- Remove the mechanism Rewind After In-flight Reading<sup>[5]</sup> + +# High-Level Design + +**For the Goal-1**: Cursor Epoch Review Comment: Ok. First, let me try to phrase your idea in my words. Tell me if I'm correct: You're basically adding a variable to a cursor, named (my name, of course) `readPositionExternalResetCount`. This variable will be increase when ever the read position is about to be reset to some value, initiated by an external action like reset-subscription, rewind, etc. Every time a read sequence begins, it starts by recording the readPositionExternalResetCount. When the reading has finished and read position is about to be updated, it verifies no external reset has been done by verifying the `readPositionExternalResetCount` was not increased relative to the recorded value. If it has, this means a reset action has started, which means we can drop and stop the reading sequence we have started. If not, no reset was done, continue and update read position. Please note even this is not transactional and fully safe. What do I suggest: 1. Rename variable. Epoch in computer science is mostly for how many seconds has passed since 1970-01-01. See https://en.wikipedia.org/wiki/Epoch_(computing) 2. Checking count has increased and if not update read position is not fully safe. Better guard update read position with a lock, since you're taking an action based on the count. I would take a lock on read position, and in that lock update count if needed and read position if needed. 3. I wouldn't use an exception for flow control. It's an expensive operation, especially for latency sensitive systems. Return value like Result of some sort - can we do that? ########## pip/pip-269.md: ########## @@ -0,0 +1,239 @@ +# Background knowledge + +The Cursor is used to read data from ledgers, there are three scenarios for reading messages(we can call these scenarios Read-Scenario): +- **Sequentially Read**: reading some entries backward from the `read position` and updating the read position to the next position to be read when reading is finished. + - Read entries from `read position`(include), after the read is complete, set `read position` to the next position of the last entry of this read, then the next round of **Sequentially Read**. +- **Message Replay Read**: Normally consumers will acknowledge messages all they received, but sometimes the consumer can not handle messages and asks the broker to redeliver these messages(these messages are always earlier than `read position`). E.g. call `consumer.negativeAcknowledge`; close a consumer and there are some messages held by this consumer, these messages will be redelivered to other consumers. Message Dispatcher will cache the position of messages which should be redelivered, and Cursor will read messages which were cached in the memory of Message Dispatcher in the next round. +- **Specified Read**(Read directly using Managed Ledger): there are some APIs that can read messages at a specified position, but generally do not affect the attributes of the Cursor or Message Dispatcher. E.g. + - [`pulsar-admin topics examine-messages`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#examine-messages) + - [`pulsar-admin topics get-message-by-id`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#get-message-by-id) + +**Occasional Read Position Change**: As described above, the `read position` is very important to **Sequentially Read**, **Message Replay Read** expects the `read position` always be changed serialized by itself. There have some cases that will also change the `read position` of the Cursor, causing the Incorrect **Sequentially Read** and Incorrect **Message Replay Read**. For example: +- [pulsar-admin topics reset-cursor](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#reset-cursor): set `read position` to a specified value. +- [consumer.seek](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java#L482): same as [pulsar-admin topics reset-cursor](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#reset-cursor), but it used by the Pulsar Client. +- [pulsar-admin topics skip](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#skip-messages): skip many messages. +- [pulsar-admin topics clear-backlog](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#skip-all-messages) +- [cursor.rewind](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2477)<sup>[1]</sup>(it is an internal API): set the `read position` to the next of `mark deleted position.` +- [cursor.seek](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2493)(it is an internal API): just set the `read position` to a specified point, it is only used for Compaction now. +- Other read position edit: + - A compensation mechanism: Something wrong makes there a position the same as `read position` in the replay queue, https://github.com/apache/pulsar/issues/6403 provides an additional mechanism to increase the `read position` before reading to avoid repeated read(read by Message Replay Read once and read by Sequentially Read once). + +# Motivation + +If **Message Replay Read** and **Occasional Read Position Change** are executed concurrently, the consumption will be out of order. For example: +| time | **Occasional Read Position Change** | **Message Replay Read** | +| --- | --- | --- | +| 1 | | Start to read `[3:5, 3:6]` | +| 2 | | Waiting for the response of BK | +| 3 | Clear redelivery messages | +| 4 | Set read position to `3:0` | +| 5 | | Read complete, and send `[3:5, 3:6]` to the client | + +In this scenario(referred to as **Scenario-1**), the consumer will receive `[3:5, 3:6]` first and then receive `[3:0.....]` + +--- + +If **Sequentially Read** and **Occasional Read Position Change** are executed concurrently, the consumption will be out of order, also leading to consumption being stuck. For example: +| time | **Occasional Read Position Change** | **Sequentially Read** | +| --- | --- | --- | +| 1 | | Start to read `{readPosition=3:5, entriesNum=5}` | +| 2 | | Waiting for the response of BK | +| 3 | Clear redelivery messages | +| 4 | Set read position to `3:0` | +| 5 | | Read complete, set the read position to `3:7`, and send `[3:5 ~ 3:9]` to the client | + +In this scenario(referred to as **Scenario-2**), the consumer will receive `[3:5 ~ 3:9]` first and the messages `[3:0~3:4]` will not be delivered to the client until unloading the topic. + +--- + +In **Occasional Read Position Change** is executed, the attributes(including `read position`) of the Cursor will be changed(`action 1`)<sup>[2]</sup>, and the attributes of the Dispatcher(`action 2`) will be changed too<sup>[3]</sup>. If these two actions are not executed atomicity, this will cause the attributes of Cursorto to be inconsistent with the attributes of Disptacher, and cause some problems. For example: +| time | **Reset cursor** | **Consumer reconnect** | +| --- | --- | --- | +| 1 | disconnect all consumers | +| 2 | | remove consumer | +| 3 | | add consumer_1 | +| 4 | | rewind cursor to `3:0` | +| 5 | | add consumer_2 | +| 6 | | add consumer_2 to recently joined consumers, `max read position` is `3:0`<sup>[4]</sup> | +| 7 | set read position to `1:0` | + +In this scenario(referred to as **Scenario-3**), the `consumer_2` will be stuck by the mechanism Recently Joined Consumers of Key_Shared mode<sup>[4]</sup>. It's a simple question, but it's more complicated than that: +- If there has an in-flight reading. +- If there is more than one **Occasional Read Position Change** action running. +- Both above. + +--- + +In the Multi Consumer Dispatcher, there has a mechanism that tries to fix the issue caused by the action `cursor.rewind` and **Sequentially Read** executed concurrently(referred to as Rewind After In-flight Reading). It works like this: if there has an in-flight reading when calling `cursor.rewind`, just do rewind after the in-flight reading is finished.` but this mechanism has the following drawbacks: +- Makes a new issue lead to a new issue<sup>[5]</sup>. +- Only applicable to `cursor.rewind`. +- Only applicable to Multi Consumer Dispatcher. +- If there is more than one in-flight reading(one Sequentially Read and some Message Replay Read might exist simultaneously), this mechanism can not work. + +# Goals +- For the **Scenario-1** and **Scenario-2**(see Motivation), discard the in-flight reading(referred to as **Goal-1**). +- For the **Scenario-3**(see Motivation), we should do these two things: + - Provide a way to guarantee the atomicity modification of both attributes of Cursor and attributes of Dispatcher(referred to as **Goal-2**). + - Prevent more than one **Occasional Read Position Change** executed at the same time(Current Proposal does not consider this optimization, there will be some bug fixes after this PIP) +- Revert https://github.com/apache/pulsar/issues/6403 +- Remove the mechanism Rewind After In-flight Reading<sup>[5]</sup> + +# High-Level Design + +**For the Goal-1**: Cursor Epoch +- Definition of a counter to record how many times the `read position` of the cursor has been changed by the **Occasional Read Position Change**, we call it Cursor Epoch. **Occasional Read Position Change** executed once, the Cursor Epoch increase once. +- Every **Sequentially Read** or **Message Replay Read** carries the current Cursor Epoch. +- Discard the reading and prevent the `read position` change if the epoch carried by the reading is smaller than the current Cursor Epoch when the reading complete + +**For the Goal-2**: Two-phase of Cursor Epoch increment +- Mark the Cursor Epoch increase task is started. + - Set a new `read position`. + - Mark the cursor is modifying, at this moment, all the **Sequentially Read** and **Message Replay Read** will fail due to `CursorModifyingException`. +- We can change attributes of the Cursor or attributes of the Dispatcher now. +- End the Cursor Epoch increase task + - increase the epoch. + - remove the marker `modifying`. + +# Detailed Design + +**ManagedLedgerException.java** +```java +public static class CursorModifyingException extends ManagedLedgerException {}; + +public static class CursorEpochConflictException extends ManagedLedgerException {}; +``` + +<strong>ManagedCursorImpl.java</strong>(maybe instead `synchronized` to a `lock`) +```java +/** Indicates how many times the `read position` of the cursor has been changed by the **Occasional Read Position Change** **/ +private volatile int epoch; +/** + * 0: no task is running. + * 1: the task is running + */ +private volatile int isEpochIncreaseRunning; + +/** Return current epoch, will not care about if there has a running epoch increase task. **/ +public int getEpoch(); + +/** +* Set `isEpochIncreaseRunning` to true and return the current epoch. +* throws CursorEpochConflictException if there a running epoch increase task is in progress. +*/ +public synchronized int startIncreaseEpoch(Supplier<Position> newReadPositionCalcultor); + +/** + * Compare and set `isEpochIncreaseRunning` to `0` and return the `isEpochIncreaseRunning` before the update. If there is no epoch increment in progress, will not increase and just return `0`. + */ +public synchronized int endIncreaseEpoch(); + +/** Only used for the scenario Sequentially Read, return true if set succeed. **/ +public synchronized boolean setReadPosition(Position newPosition, int expectedCursorEpoch); + +/** Return false if there is a running epoch increase task or epoch was changed. **/ +public boolean isEpochOutdated(int expectedSpoch); +``` + +**CursorStats.java** + +```java +public int epoch; +public int isEpochIncreaseRunning; +``` + +A demo of `rewind` in Key_shared mode: + +```java +try { + cursor.startIncreaseEpoch(() -> markDeletedPostion + 1); + dispatcher.clearDelayedMessageTracker(); + dispatcher.clearRedeliveryMessages(); + dispatcher.clearRecentJoinedConsumers(); + if (cursor.endIncreaseEpoch() == 2 ) { + readMoreEntries(); // compensate read once. + } +} catch (CursorEpochConflictException ex) { + // do retry or something else. +} +``` + +A demo of **Sequentially Read**: + +```java +public synchronized void readMoreEntries() { + ReadEntriesCallback cb = this; + int entriesCountToRead = 10; + int epoch = cursor.getEpoch(); + cursor.asyncRead(entriesCountToRead, cb, epoch); +} + +public final synchronized void readEntriesComplete(List<Entry> entries, Object ctx) { + int cursorEpochWhenStartRead = (int) ctx; + // Normally “readEntriesComplete” and **Occasional Read Position Change** are mutually exclusive. + if (cursor.isEpochOutdated(cursorEpochWhenStartRead)) { + entries.forEach(Entry::release); + return; + } + // send messages to client. +} +``` + +### Metrics + +**pulsar_cursor_epoch_increase_count** +- Description: how many times the `read position` of the cursor has been changed by the **Occasional Read Position Change** +- type: Counter +- labels: `[cluster, namespace, topic, subscription]` +- unit: times + +**pulsar_cursor_epoch_increase_task_state** Review Comment: Better count, as it will rapidly change, sampling every 30sec you won't see anything. ########## pip/pip-269.md: ########## @@ -0,0 +1,239 @@ +# Background knowledge + +The Cursor is used to read data from ledgers, there are three scenarios for reading messages(we can call these scenarios Read-Scenario): +- **Sequentially Read**: reading some entries backward from the `read position` and updating the read position to the next position to be read when reading is finished. + - Read entries from `read position`(include), after the read is complete, set `read position` to the next position of the last entry of this read, then the next round of **Sequentially Read**. +- **Message Replay Read**: Normally consumers will acknowledge messages all they received, but sometimes the consumer can not handle messages and asks the broker to redeliver these messages(these messages are always earlier than `read position`). E.g. call `consumer.negativeAcknowledge`; close a consumer and there are some messages held by this consumer, these messages will be redelivered to other consumers. Message Dispatcher will cache the position of messages which should be redelivered, and Cursor will read messages which were cached in the memory of Message Dispatcher in the next round. +- **Specified Read**(Read directly using Managed Ledger): there are some APIs that can read messages at a specified position, but generally do not affect the attributes of the Cursor or Message Dispatcher. E.g. Review Comment: You mean direct read (bypass cursor)? ########## pip/pip-269.md: ########## @@ -0,0 +1,239 @@ +# Background knowledge + +The Cursor is used to read data from ledgers, there are three scenarios for reading messages(we can call these scenarios Read-Scenario): +- **Sequentially Read**: reading some entries backward from the `read position` and updating the read position to the next position to be read when reading is finished. + - Read entries from `read position`(include), after the read is complete, set `read position` to the next position of the last entry of this read, then the next round of **Sequentially Read**. +- **Message Replay Read**: Normally consumers will acknowledge messages all they received, but sometimes the consumer can not handle messages and asks the broker to redeliver these messages(these messages are always earlier than `read position`). E.g. call `consumer.negativeAcknowledge`; close a consumer and there are some messages held by this consumer, these messages will be redelivered to other consumers. Message Dispatcher will cache the position of messages which should be redelivered, and Cursor will read messages which were cached in the memory of Message Dispatcher in the next round. +- **Specified Read**(Read directly using Managed Ledger): there are some APIs that can read messages at a specified position, but generally do not affect the attributes of the Cursor or Message Dispatcher. E.g. + - [`pulsar-admin topics examine-messages`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#examine-messages) + - [`pulsar-admin topics get-message-by-id`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#get-message-by-id) + +**Occasional Read Position Change**: As described above, the `read position` is very important to **Sequentially Read**, **Message Replay Read** expects the `read position` always be changed serialized by itself. There have some cases that will also change the `read position` of the Cursor, causing the Incorrect **Sequentially Read** and Incorrect **Message Replay Read**. For example: Review Comment: Maybe "**Occasional Read Position Change**" --> "External Read Position Reset" or "out-of-band read position reset" or "read position reset"? ########## pip/pip-269.md: ########## @@ -0,0 +1,239 @@ +# Background knowledge + +The Cursor is used to read data from ledgers, there are three scenarios for reading messages(we can call these scenarios Read-Scenario): +- **Sequentially Read**: reading some entries backward from the `read position` and updating the read position to the next position to be read when reading is finished. + - Read entries from `read position`(include), after the read is complete, set `read position` to the next position of the last entry of this read, then the next round of **Sequentially Read**. +- **Message Replay Read**: Normally consumers will acknowledge messages all they received, but sometimes the consumer can not handle messages and asks the broker to redeliver these messages(these messages are always earlier than `read position`). E.g. call `consumer.negativeAcknowledge`; close a consumer and there are some messages held by this consumer, these messages will be redelivered to other consumers. Message Dispatcher will cache the position of messages which should be redelivered, and Cursor will read messages which were cached in the memory of Message Dispatcher in the next round. Review Comment: In those instances where the reading will be done from the "cache" of negatively acknowledged messages, it will update the read position? ########## pip/pip-269.md: ########## @@ -0,0 +1,239 @@ +# Background knowledge + +The Cursor is used to read data from ledgers, there are three scenarios for reading messages(we can call these scenarios Read-Scenario): +- **Sequentially Read**: reading some entries backward from the `read position` and updating the read position to the next position to be read when reading is finished. + - Read entries from `read position`(include), after the read is complete, set `read position` to the next position of the last entry of this read, then the next round of **Sequentially Read**. +- **Message Replay Read**: Normally consumers will acknowledge messages all they received, but sometimes the consumer can not handle messages and asks the broker to redeliver these messages(these messages are always earlier than `read position`). E.g. call `consumer.negativeAcknowledge`; close a consumer and there are some messages held by this consumer, these messages will be redelivered to other consumers. Message Dispatcher will cache the position of messages which should be redelivered, and Cursor will read messages which were cached in the memory of Message Dispatcher in the next round. +- **Specified Read**(Read directly using Managed Ledger): there are some APIs that can read messages at a specified position, but generally do not affect the attributes of the Cursor or Message Dispatcher. E.g. + - [`pulsar-admin topics examine-messages`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#examine-messages) + - [`pulsar-admin topics get-message-by-id`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#get-message-by-id) + +**Occasional Read Position Change**: As described above, the `read position` is very important to **Sequentially Read**, **Message Replay Read** expects the `read position` always be changed serialized by itself. There have some cases that will also change the `read position` of the Cursor, causing the Incorrect **Sequentially Read** and Incorrect **Message Replay Read**. For example: +- [pulsar-admin topics reset-cursor](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#reset-cursor): set `read position` to a specified value. +- [consumer.seek](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java#L482): same as [pulsar-admin topics reset-cursor](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#reset-cursor), but it used by the Pulsar Client. +- [pulsar-admin topics skip](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#skip-messages): skip many messages. +- [pulsar-admin topics clear-backlog](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#skip-all-messages) +- [cursor.rewind](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2477)<sup>[1]</sup>(it is an internal API): set the `read position` to the next of `mark deleted position.` +- [cursor.seek](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2493)(it is an internal API): just set the `read position` to a specified point, it is only used for Compaction now. +- Other read position edit: + - A compensation mechanism: Something wrong makes there a position the same as `read position` in the replay queue, https://github.com/apache/pulsar/issues/6403 provides an additional mechanism to increase the `read position` before reading to avoid repeated read(read by Message Replay Read once and read by Sequentially Read once). + +# Motivation + +If **Message Replay Read** and **Occasional Read Position Change** are executed concurrently, the consumption will be out of order. For example: +| time | **Occasional Read Position Change** | **Message Replay Read** | +| --- | --- | --- | +| 1 | | Start to read `[3:5, 3:6]` | +| 2 | | Waiting for the response of BK | +| 3 | Clear redelivery messages | +| 4 | Set read position to `3:0` | +| 5 | | Read complete, and send `[3:5, 3:6]` to the client | + +In this scenario(referred to as **Scenario-1**), the consumer will receive `[3:5, 3:6]` first and then receive `[3:0.....]` + +--- + +If **Sequentially Read** and **Occasional Read Position Change** are executed concurrently, the consumption will be out of order, also leading to consumption being stuck. For example: +| time | **Occasional Read Position Change** | **Sequentially Read** | +| --- | --- | --- | +| 1 | | Start to read `{readPosition=3:5, entriesNum=5}` | +| 2 | | Waiting for the response of BK | +| 3 | Clear redelivery messages | +| 4 | Set read position to `3:0` | +| 5 | | Read complete, set the read position to `3:7`, and send `[3:5 ~ 3:9]` to the client | + +In this scenario(referred to as **Scenario-2**), the consumer will receive `[3:5 ~ 3:9]` first and the messages `[3:0~3:4]` will not be delivered to the client until unloading the topic. + +--- + +In **Occasional Read Position Change** is executed, the attributes(including `read position`) of the Cursor will be changed(`action 1`)<sup>[2]</sup>, and the attributes of the Dispatcher(`action 2`) will be changed too<sup>[3]</sup>. If these two actions are not executed atomicity, this will cause the attributes of Cursorto to be inconsistent with the attributes of Disptacher, and cause some problems. For example: Review Comment: "Cursorto" -> "Cursor to" ########## pip/pip-269.md: ########## @@ -0,0 +1,239 @@ +# Background knowledge + +The Cursor is used to read data from ledgers, there are three scenarios for reading messages(we can call these scenarios Read-Scenario): +- **Sequentially Read**: reading some entries backward from the `read position` and updating the read position to the next position to be read when reading is finished. + - Read entries from `read position`(include), after the read is complete, set `read position` to the next position of the last entry of this read, then the next round of **Sequentially Read**. +- **Message Replay Read**: Normally consumers will acknowledge messages all they received, but sometimes the consumer can not handle messages and asks the broker to redeliver these messages(these messages are always earlier than `read position`). E.g. call `consumer.negativeAcknowledge`; close a consumer and there are some messages held by this consumer, these messages will be redelivered to other consumers. Message Dispatcher will cache the position of messages which should be redelivered, and Cursor will read messages which were cached in the memory of Message Dispatcher in the next round. +- **Specified Read**(Read directly using Managed Ledger): there are some APIs that can read messages at a specified position, but generally do not affect the attributes of the Cursor or Message Dispatcher. E.g. + - [`pulsar-admin topics examine-messages`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#examine-messages) + - [`pulsar-admin topics get-message-by-id`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#get-message-by-id) + +**Occasional Read Position Change**: As described above, the `read position` is very important to **Sequentially Read**, **Message Replay Read** expects the `read position` always be changed serialized by itself. There have some cases that will also change the `read position` of the Cursor, causing the Incorrect **Sequentially Read** and Incorrect **Message Replay Read**. For example: Review Comment: "expects the `read position` always be changed serialized by itself" - I can't understand the relationship of serialization to read negative ack messages. Can you explain? ########## pip/pip-269.md: ########## @@ -0,0 +1,239 @@ +# Background knowledge + +The Cursor is used to read data from ledgers, there are three scenarios for reading messages(we can call these scenarios Read-Scenario): +- **Sequentially Read**: reading some entries backward from the `read position` and updating the read position to the next position to be read when reading is finished. + - Read entries from `read position`(include), after the read is complete, set `read position` to the next position of the last entry of this read, then the next round of **Sequentially Read**. +- **Message Replay Read**: Normally consumers will acknowledge messages all they received, but sometimes the consumer can not handle messages and asks the broker to redeliver these messages(these messages are always earlier than `read position`). E.g. call `consumer.negativeAcknowledge`; close a consumer and there are some messages held by this consumer, these messages will be redelivered to other consumers. Message Dispatcher will cache the position of messages which should be redelivered, and Cursor will read messages which were cached in the memory of Message Dispatcher in the next round. +- **Specified Read**(Read directly using Managed Ledger): there are some APIs that can read messages at a specified position, but generally do not affect the attributes of the Cursor or Message Dispatcher. E.g. + - [`pulsar-admin topics examine-messages`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#examine-messages) + - [`pulsar-admin topics get-message-by-id`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#get-message-by-id) + +**Occasional Read Position Change**: As described above, the `read position` is very important to **Sequentially Read**, **Message Replay Read** expects the `read position` always be changed serialized by itself. There have some cases that will also change the `read position` of the Cursor, causing the Incorrect **Sequentially Read** and Incorrect **Message Replay Read**. For example: +- [pulsar-admin topics reset-cursor](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#reset-cursor): set `read position` to a specified value. +- [consumer.seek](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java#L482): same as [pulsar-admin topics reset-cursor](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#reset-cursor), but it used by the Pulsar Client. +- [pulsar-admin topics skip](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#skip-messages): skip many messages. +- [pulsar-admin topics clear-backlog](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#skip-all-messages) +- [cursor.rewind](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2477)<sup>[1]</sup>(it is an internal API): set the `read position` to the next of `mark deleted position.` +- [cursor.seek](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2493)(it is an internal API): just set the `read position` to a specified point, it is only used for Compaction now. +- Other read position edit: + - A compensation mechanism: Something wrong makes there a position the same as `read position` in the replay queue, https://github.com/apache/pulsar/issues/6403 provides an additional mechanism to increase the `read position` before reading to avoid repeated read(read by Message Replay Read once and read by Sequentially Read once). + +# Motivation + +If **Message Replay Read** and **Occasional Read Position Change** are executed concurrently, the consumption will be out of order. For example: +| time | **Occasional Read Position Change** | **Message Replay Read** | +| --- | --- | --- | +| 1 | | Start to read `[3:5, 3:6]` | +| 2 | | Waiting for the response of BK | +| 3 | Clear redelivery messages | +| 4 | Set read position to `3:0` | +| 5 | | Read complete, and send `[3:5, 3:6]` to the client | + +In this scenario(referred to as **Scenario-1**), the consumer will receive `[3:5, 3:6]` first and then receive `[3:0.....]` + +--- + +If **Sequentially Read** and **Occasional Read Position Change** are executed concurrently, the consumption will be out of order, also leading to consumption being stuck. For example: +| time | **Occasional Read Position Change** | **Sequentially Read** | +| --- | --- | --- | +| 1 | | Start to read `{readPosition=3:5, entriesNum=5}` | +| 2 | | Waiting for the response of BK | +| 3 | Clear redelivery messages | +| 4 | Set read position to `3:0` | +| 5 | | Read complete, set the read position to `3:7`, and send `[3:5 ~ 3:9]` to the client | + +In this scenario(referred to as **Scenario-2**), the consumer will receive `[3:5 ~ 3:9]` first and the messages `[3:0~3:4]` will not be delivered to the client until unloading the topic. + +--- + +In **Occasional Read Position Change** is executed, the attributes(including `read position`) of the Cursor will be changed(`action 1`)<sup>[2]</sup>, and the attributes of the Dispatcher(`action 2`) will be changed too<sup>[3]</sup>. If these two actions are not executed atomicity, this will cause the attributes of Cursorto to be inconsistent with the attributes of Disptacher, and cause some problems. For example: +| time | **Reset cursor** | **Consumer reconnect** | +| --- | --- | --- | +| 1 | disconnect all consumers | +| 2 | | remove consumer | +| 3 | | add consumer_1 | +| 4 | | rewind cursor to `3:0` | +| 5 | | add consumer_2 | +| 6 | | add consumer_2 to recently joined consumers, `max read position` is `3:0`<sup>[4]</sup> | +| 7 | set read position to `1:0` | + +In this scenario(referred to as **Scenario-3**), the `consumer_2` will be stuck by the mechanism Recently Joined Consumers of Key_Shared mode<sup>[4]</sup>. It's a simple question, but it's more complicated than that: +- If there has an in-flight reading. +- If there is more than one **Occasional Read Position Change** action running. +- Both above. + +--- + +In the Multi Consumer Dispatcher, there has a mechanism that tries to fix the issue caused by the action `cursor.rewind` and **Sequentially Read** executed concurrently(referred to as Rewind After In-flight Reading). It works like this: if there has an in-flight reading when calling `cursor.rewind`, just do rewind after the in-flight reading is finished.` but this mechanism has the following drawbacks: +- Makes a new issue lead to a new issue<sup>[5]</sup>. +- Only applicable to `cursor.rewind`. +- Only applicable to Multi Consumer Dispatcher. +- If there is more than one in-flight reading(one Sequentially Read and some Message Replay Read might exist simultaneously), this mechanism can not work. + +# Goals +- For the **Scenario-1** and **Scenario-2**(see Motivation), discard the in-flight reading(referred to as **Goal-1**). +- For the **Scenario-3**(see Motivation), we should do these two things: + - Provide a way to guarantee the atomicity modification of both attributes of Cursor and attributes of Dispatcher(referred to as **Goal-2**). + - Prevent more than one **Occasional Read Position Change** executed at the same time(Current Proposal does not consider this optimization, there will be some bug fixes after this PIP) +- Revert https://github.com/apache/pulsar/issues/6403 +- Remove the mechanism Rewind After In-flight Reading<sup>[5]</sup> + +# High-Level Design + +**For the Goal-1**: Cursor Epoch +- Definition of a counter to record how many times the `read position` of the cursor has been changed by the **Occasional Read Position Change**, we call it Cursor Epoch. **Occasional Read Position Change** executed once, the Cursor Epoch increase once. +- Every **Sequentially Read** or **Message Replay Read** carries the current Cursor Epoch. +- Discard the reading and prevent the `read position` change if the epoch carried by the reading is smaller than the current Cursor Epoch when the reading complete + +**For the Goal-2**: Two-phase of Cursor Epoch increment Review Comment: Lost you here. Why do you need to use same epoch. I wonder why not a lock perhaps for cursor and dispatcher. I guess I need to dive more perhaps. ########## pip/pip-269.md: ########## @@ -0,0 +1,239 @@ +# Background knowledge + +The Cursor is used to read data from ledgers, there are three scenarios for reading messages(we can call these scenarios Read-Scenario): +- **Sequentially Read**: reading some entries backward from the `read position` and updating the read position to the next position to be read when reading is finished. + - Read entries from `read position`(include), after the read is complete, set `read position` to the next position of the last entry of this read, then the next round of **Sequentially Read**. +- **Message Replay Read**: Normally consumers will acknowledge messages all they received, but sometimes the consumer can not handle messages and asks the broker to redeliver these messages(these messages are always earlier than `read position`). E.g. call `consumer.negativeAcknowledge`; close a consumer and there are some messages held by this consumer, these messages will be redelivered to other consumers. Message Dispatcher will cache the position of messages which should be redelivered, and Cursor will read messages which were cached in the memory of Message Dispatcher in the next round. +- **Specified Read**(Read directly using Managed Ledger): there are some APIs that can read messages at a specified position, but generally do not affect the attributes of the Cursor or Message Dispatcher. E.g. + - [`pulsar-admin topics examine-messages`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#examine-messages) + - [`pulsar-admin topics get-message-by-id`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#get-message-by-id) + +**Occasional Read Position Change**: As described above, the `read position` is very important to **Sequentially Read**, **Message Replay Read** expects the `read position` always be changed serialized by itself. There have some cases that will also change the `read position` of the Cursor, causing the Incorrect **Sequentially Read** and Incorrect **Message Replay Read**. For example: +- [pulsar-admin topics reset-cursor](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#reset-cursor): set `read position` to a specified value. +- [consumer.seek](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java#L482): same as [pulsar-admin topics reset-cursor](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#reset-cursor), but it used by the Pulsar Client. +- [pulsar-admin topics skip](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#skip-messages): skip many messages. +- [pulsar-admin topics clear-backlog](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#skip-all-messages) +- [cursor.rewind](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2477)<sup>[1]</sup>(it is an internal API): set the `read position` to the next of `mark deleted position.` +- [cursor.seek](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2493)(it is an internal API): just set the `read position` to a specified point, it is only used for Compaction now. +- Other read position edit: + - A compensation mechanism: Something wrong makes there a position the same as `read position` in the replay queue, https://github.com/apache/pulsar/issues/6403 provides an additional mechanism to increase the `read position` before reading to avoid repeated read(read by Message Replay Read once and read by Sequentially Read once). + Review Comment: Overall, great background ! ########## pip/pip-269.md: ########## @@ -0,0 +1,239 @@ +# Background knowledge + +The Cursor is used to read data from ledgers, there are three scenarios for reading messages(we can call these scenarios Read-Scenario): +- **Sequentially Read**: reading some entries backward from the `read position` and updating the read position to the next position to be read when reading is finished. + - Read entries from `read position`(include), after the read is complete, set `read position` to the next position of the last entry of this read, then the next round of **Sequentially Read**. +- **Message Replay Read**: Normally consumers will acknowledge messages all they received, but sometimes the consumer can not handle messages and asks the broker to redeliver these messages(these messages are always earlier than `read position`). E.g. call `consumer.negativeAcknowledge`; close a consumer and there are some messages held by this consumer, these messages will be redelivered to other consumers. Message Dispatcher will cache the position of messages which should be redelivered, and Cursor will read messages which were cached in the memory of Message Dispatcher in the next round. +- **Specified Read**(Read directly using Managed Ledger): there are some APIs that can read messages at a specified position, but generally do not affect the attributes of the Cursor or Message Dispatcher. E.g. + - [`pulsar-admin topics examine-messages`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#examine-messages) + - [`pulsar-admin topics get-message-by-id`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#get-message-by-id) + +**Occasional Read Position Change**: As described above, the `read position` is very important to **Sequentially Read**, **Message Replay Read** expects the `read position` always be changed serialized by itself. There have some cases that will also change the `read position` of the Cursor, causing the Incorrect **Sequentially Read** and Incorrect **Message Replay Read**. For example: +- [pulsar-admin topics reset-cursor](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#reset-cursor): set `read position` to a specified value. +- [consumer.seek](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java#L482): same as [pulsar-admin topics reset-cursor](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#reset-cursor), but it used by the Pulsar Client. +- [pulsar-admin topics skip](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#skip-messages): skip many messages. +- [pulsar-admin topics clear-backlog](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#skip-all-messages) +- [cursor.rewind](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2477)<sup>[1]</sup>(it is an internal API): set the `read position` to the next of `mark deleted position.` +- [cursor.seek](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2493)(it is an internal API): just set the `read position` to a specified point, it is only used for Compaction now. +- Other read position edit: + - A compensation mechanism: Something wrong makes there a position the same as `read position` in the replay queue, https://github.com/apache/pulsar/issues/6403 provides an additional mechanism to increase the `read position` before reading to avoid repeated read(read by Message Replay Read once and read by Sequentially Read once). + +# Motivation + +If **Message Replay Read** and **Occasional Read Position Change** are executed concurrently, the consumption will be out of order. For example: +| time | **Occasional Read Position Change** | **Message Replay Read** | +| --- | --- | --- | +| 1 | | Start to read `[3:5, 3:6]` | +| 2 | | Waiting for the response of BK | +| 3 | Clear redelivery messages | +| 4 | Set read position to `3:0` | +| 5 | | Read complete, and send `[3:5, 3:6]` to the client | + +In this scenario(referred to as **Scenario-1**), the consumer will receive `[3:5, 3:6]` first and then receive `[3:0.....]` + +--- + +If **Sequentially Read** and **Occasional Read Position Change** are executed concurrently, the consumption will be out of order, also leading to consumption being stuck. For example: +| time | **Occasional Read Position Change** | **Sequentially Read** | +| --- | --- | --- | +| 1 | | Start to read `{readPosition=3:5, entriesNum=5}` | +| 2 | | Waiting for the response of BK | +| 3 | Clear redelivery messages | +| 4 | Set read position to `3:0` | +| 5 | | Read complete, set the read position to `3:7`, and send `[3:5 ~ 3:9]` to the client | + +In this scenario(referred to as **Scenario-2**), the consumer will receive `[3:5 ~ 3:9]` first and the messages `[3:0~3:4]` will not be delivered to the client until unloading the topic. + +--- + +In **Occasional Read Position Change** is executed, the attributes(including `read position`) of the Cursor will be changed(`action 1`)<sup>[2]</sup>, and the attributes of the Dispatcher(`action 2`) will be changed too<sup>[3]</sup>. If these two actions are not executed atomicity, this will cause the attributes of Cursorto to be inconsistent with the attributes of Disptacher, and cause some problems. For example: +| time | **Reset cursor** | **Consumer reconnect** | +| --- | --- | --- | +| 1 | disconnect all consumers | +| 2 | | remove consumer | +| 3 | | add consumer_1 | +| 4 | | rewind cursor to `3:0` | +| 5 | | add consumer_2 | +| 6 | | add consumer_2 to recently joined consumers, `max read position` is `3:0`<sup>[4]</sup> | +| 7 | set read position to `1:0` | + +In this scenario(referred to as **Scenario-3**), the `consumer_2` will be stuck by the mechanism Recently Joined Consumers of Key_Shared mode<sup>[4]</sup>. It's a simple question, but it's more complicated than that: +- If there has an in-flight reading. +- If there is more than one **Occasional Read Position Change** action running. +- Both above. + +--- + +In the Multi Consumer Dispatcher, there has a mechanism that tries to fix the issue caused by the action `cursor.rewind` and **Sequentially Read** executed concurrently(referred to as Rewind After In-flight Reading). It works like this: if there has an in-flight reading when calling `cursor.rewind`, just do rewind after the in-flight reading is finished.` but this mechanism has the following drawbacks: +- Makes a new issue lead to a new issue<sup>[5]</sup>. +- Only applicable to `cursor.rewind`. +- Only applicable to Multi Consumer Dispatcher. +- If there is more than one in-flight reading(one Sequentially Read and some Message Replay Read might exist simultaneously), this mechanism can not work. + +# Goals +- For the **Scenario-1** and **Scenario-2**(see Motivation), discard the in-flight reading(referred to as **Goal-1**). +- For the **Scenario-3**(see Motivation), we should do these two things: + - Provide a way to guarantee the atomicity modification of both attributes of Cursor and attributes of Dispatcher(referred to as **Goal-2**). + - Prevent more than one **Occasional Read Position Change** executed at the same time(Current Proposal does not consider this optimization, there will be some bug fixes after this PIP) +- Revert https://github.com/apache/pulsar/issues/6403 +- Remove the mechanism Rewind After In-flight Reading<sup>[5]</sup> + +# High-Level Design + +**For the Goal-1**: Cursor Epoch +- Definition of a counter to record how many times the `read position` of the cursor has been changed by the **Occasional Read Position Change**, we call it Cursor Epoch. **Occasional Read Position Change** executed once, the Cursor Epoch increase once. +- Every **Sequentially Read** or **Message Replay Read** carries the current Cursor Epoch. +- Discard the reading and prevent the `read position` change if the epoch carried by the reading is smaller than the current Cursor Epoch when the reading complete + +**For the Goal-2**: Two-phase of Cursor Epoch increment +- Mark the Cursor Epoch increase task is started. + - Set a new `read position`. + - Mark the cursor is modifying, at this moment, all the **Sequentially Read** and **Message Replay Read** will fail due to `CursorModifyingException`. +- We can change attributes of the Cursor or attributes of the Dispatcher now. +- End the Cursor Epoch increase task + - increase the epoch. + - remove the marker `modifying`. + +# Detailed Design + +**ManagedLedgerException.java** +```java +public static class CursorModifyingException extends ManagedLedgerException {}; + +public static class CursorEpochConflictException extends ManagedLedgerException {}; +``` + +<strong>ManagedCursorImpl.java</strong>(maybe instead `synchronized` to a `lock`) Review Comment: Please, let's not add more code to `ManagedCursorImpl` - it becomes a dumpster. Let's try to concentrate the same domain into one class. First, I need to understand the high level design of goal 2 to be able to give good suggestions here. ########## pip/pip-269.md: ########## @@ -0,0 +1,239 @@ +# Background knowledge + +The Cursor is used to read data from ledgers, there are three scenarios for reading messages(we can call these scenarios Read-Scenario): +- **Sequentially Read**: reading some entries backward from the `read position` and updating the read position to the next position to be read when reading is finished. + - Read entries from `read position`(include), after the read is complete, set `read position` to the next position of the last entry of this read, then the next round of **Sequentially Read**. +- **Message Replay Read**: Normally consumers will acknowledge messages all they received, but sometimes the consumer can not handle messages and asks the broker to redeliver these messages(these messages are always earlier than `read position`). E.g. call `consumer.negativeAcknowledge`; close a consumer and there are some messages held by this consumer, these messages will be redelivered to other consumers. Message Dispatcher will cache the position of messages which should be redelivered, and Cursor will read messages which were cached in the memory of Message Dispatcher in the next round. +- **Specified Read**(Read directly using Managed Ledger): there are some APIs that can read messages at a specified position, but generally do not affect the attributes of the Cursor or Message Dispatcher. E.g. + - [`pulsar-admin topics examine-messages`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#examine-messages) + - [`pulsar-admin topics get-message-by-id`](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#get-message-by-id) + +**Occasional Read Position Change**: As described above, the `read position` is very important to **Sequentially Read**, **Message Replay Read** expects the `read position` always be changed serialized by itself. There have some cases that will also change the `read position` of the Cursor, causing the Incorrect **Sequentially Read** and Incorrect **Message Replay Read**. For example: +- [pulsar-admin topics reset-cursor](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#reset-cursor): set `read position` to a specified value. +- [consumer.seek](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java#L482): same as [pulsar-admin topics reset-cursor](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#reset-cursor), but it used by the Pulsar Client. +- [pulsar-admin topics skip](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#skip-messages): skip many messages. +- [pulsar-admin topics clear-backlog](https://pulsar.apache.org/docs/3.0.x/admin-api-topics/#skip-all-messages) +- [cursor.rewind](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2477)<sup>[1]</sup>(it is an internal API): set the `read position` to the next of `mark deleted position.` +- [cursor.seek](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2493)(it is an internal API): just set the `read position` to a specified point, it is only used for Compaction now. +- Other read position edit: + - A compensation mechanism: Something wrong makes there a position the same as `read position` in the replay queue, https://github.com/apache/pulsar/issues/6403 provides an additional mechanism to increase the `read position` before reading to avoid repeated read(read by Message Replay Read once and read by Sequentially Read once). + +# Motivation + +If **Message Replay Read** and **Occasional Read Position Change** are executed concurrently, the consumption will be out of order. For example: +| time | **Occasional Read Position Change** | **Message Replay Read** | +| --- | --- | --- | +| 1 | | Start to read `[3:5, 3:6]` | +| 2 | | Waiting for the response of BK | +| 3 | Clear redelivery messages | +| 4 | Set read position to `3:0` | +| 5 | | Read complete, and send `[3:5, 3:6]` to the client | + +In this scenario(referred to as **Scenario-1**), the consumer will receive `[3:5, 3:6]` first and then receive `[3:0.....]` + +--- + +If **Sequentially Read** and **Occasional Read Position Change** are executed concurrently, the consumption will be out of order, also leading to consumption being stuck. For example: +| time | **Occasional Read Position Change** | **Sequentially Read** | +| --- | --- | --- | +| 1 | | Start to read `{readPosition=3:5, entriesNum=5}` | +| 2 | | Waiting for the response of BK | +| 3 | Clear redelivery messages | +| 4 | Set read position to `3:0` | +| 5 | | Read complete, set the read position to `3:7`, and send `[3:5 ~ 3:9]` to the client | + +In this scenario(referred to as **Scenario-2**), the consumer will receive `[3:5 ~ 3:9]` first and the messages `[3:0~3:4]` will not be delivered to the client until unloading the topic. + +--- + +In **Occasional Read Position Change** is executed, the attributes(including `read position`) of the Cursor will be changed(`action 1`)<sup>[2]</sup>, and the attributes of the Dispatcher(`action 2`) will be changed too<sup>[3]</sup>. If these two actions are not executed atomicity, this will cause the attributes of Cursorto to be inconsistent with the attributes of Disptacher, and cause some problems. For example: +| time | **Reset cursor** | **Consumer reconnect** | +| --- | --- | --- | +| 1 | disconnect all consumers | +| 2 | | remove consumer | +| 3 | | add consumer_1 | +| 4 | | rewind cursor to `3:0` | +| 5 | | add consumer_2 | +| 6 | | add consumer_2 to recently joined consumers, `max read position` is `3:0`<sup>[4]</sup> | +| 7 | set read position to `1:0` | + +In this scenario(referred to as **Scenario-3**), the `consumer_2` will be stuck by the mechanism Recently Joined Consumers of Key_Shared mode<sup>[4]</sup>. It's a simple question, but it's more complicated than that: +- If there has an in-flight reading. +- If there is more than one **Occasional Read Position Change** action running. +- Both above. + +--- + +In the Multi Consumer Dispatcher, there has a mechanism that tries to fix the issue caused by the action `cursor.rewind` and **Sequentially Read** executed concurrently(referred to as Rewind After In-flight Reading). It works like this: if there has an in-flight reading when calling `cursor.rewind`, just do rewind after the in-flight reading is finished.` but this mechanism has the following drawbacks: +- Makes a new issue lead to a new issue<sup>[5]</sup>. +- Only applicable to `cursor.rewind`. +- Only applicable to Multi Consumer Dispatcher. +- If there is more than one in-flight reading(one Sequentially Read and some Message Replay Read might exist simultaneously), this mechanism can not work. + +# Goals +- For the **Scenario-1** and **Scenario-2**(see Motivation), discard the in-flight reading(referred to as **Goal-1**). +- For the **Scenario-3**(see Motivation), we should do these two things: + - Provide a way to guarantee the atomicity modification of both attributes of Cursor and attributes of Dispatcher(referred to as **Goal-2**). + - Prevent more than one **Occasional Read Position Change** executed at the same time(Current Proposal does not consider this optimization, there will be some bug fixes after this PIP) +- Revert https://github.com/apache/pulsar/issues/6403 +- Remove the mechanism Rewind After In-flight Reading<sup>[5]</sup> + +# High-Level Design + +**For the Goal-1**: Cursor Epoch +- Definition of a counter to record how many times the `read position` of the cursor has been changed by the **Occasional Read Position Change**, we call it Cursor Epoch. **Occasional Read Position Change** executed once, the Cursor Epoch increase once. +- Every **Sequentially Read** or **Message Replay Read** carries the current Cursor Epoch. +- Discard the reading and prevent the `read position` change if the epoch carried by the reading is smaller than the current Cursor Epoch when the reading complete + +**For the Goal-2**: Two-phase of Cursor Epoch increment +- Mark the Cursor Epoch increase task is started. + - Set a new `read position`. + - Mark the cursor is modifying, at this moment, all the **Sequentially Read** and **Message Replay Read** will fail due to `CursorModifyingException`. +- We can change attributes of the Cursor or attributes of the Dispatcher now. +- End the Cursor Epoch increase task + - increase the epoch. + - remove the marker `modifying`. + +# Detailed Design + +**ManagedLedgerException.java** +```java +public static class CursorModifyingException extends ManagedLedgerException {}; + +public static class CursorEpochConflictException extends ManagedLedgerException {}; +``` + +<strong>ManagedCursorImpl.java</strong>(maybe instead `synchronized` to a `lock`) +```java +/** Indicates how many times the `read position` of the cursor has been changed by the **Occasional Read Position Change** **/ +private volatile int epoch; +/** + * 0: no task is running. + * 1: the task is running + */ +private volatile int isEpochIncreaseRunning; + +/** Return current epoch, will not care about if there has a running epoch increase task. **/ +public int getEpoch(); + +/** +* Set `isEpochIncreaseRunning` to true and return the current epoch. +* throws CursorEpochConflictException if there a running epoch increase task is in progress. +*/ +public synchronized int startIncreaseEpoch(Supplier<Position> newReadPositionCalcultor); + +/** + * Compare and set `isEpochIncreaseRunning` to `0` and return the `isEpochIncreaseRunning` before the update. If there is no epoch increment in progress, will not increase and just return `0`. + */ +public synchronized int endIncreaseEpoch(); + +/** Only used for the scenario Sequentially Read, return true if set succeed. **/ +public synchronized boolean setReadPosition(Position newPosition, int expectedCursorEpoch); + +/** Return false if there is a running epoch increase task or epoch was changed. **/ +public boolean isEpochOutdated(int expectedSpoch); +``` + +**CursorStats.java** + +```java +public int epoch; +public int isEpochIncreaseRunning; +``` + +A demo of `rewind` in Key_shared mode: + +```java +try { + cursor.startIncreaseEpoch(() -> markDeletedPostion + 1); + dispatcher.clearDelayedMessageTracker(); + dispatcher.clearRedeliveryMessages(); + dispatcher.clearRecentJoinedConsumers(); + if (cursor.endIncreaseEpoch() == 2 ) { + readMoreEntries(); // compensate read once. + } +} catch (CursorEpochConflictException ex) { + // do retry or something else. +} +``` + +A demo of **Sequentially Read**: + +```java +public synchronized void readMoreEntries() { + ReadEntriesCallback cb = this; + int entriesCountToRead = 10; + int epoch = cursor.getEpoch(); + cursor.asyncRead(entriesCountToRead, cb, epoch); +} + +public final synchronized void readEntriesComplete(List<Entry> entries, Object ctx) { + int cursorEpochWhenStartRead = (int) ctx; + // Normally “readEntriesComplete” and **Occasional Read Position Change** are mutually exclusive. + if (cursor.isEpochOutdated(cursorEpochWhenStartRead)) { + entries.forEach(Entry::release); + return; + } + // send messages to client. +} +``` + +### Metrics + +**pulsar_cursor_epoch_increase_count** +- Description: how many times the `read position` of the cursor has been changed by the **Occasional Read Position Change** +- type: Counter +- labels: `[cluster, namespace, topic, subscription]` +- unit: times + +**pulsar_cursor_epoch_increase_task_state** +- Description: + - 0: no Cursor Epoch increase task is running. + - 1: the Cursor Epoch increase task is running +- type: Enumeration +- labels: `[cluster, namespace, topic, subscription]` + +# Monitoring +- If the frequency of `pulsar_cursor_epoch` is high, it means that the client uses a lot of APIs to manually modify the `read position` of the Cursor; or restarts all clients frequently, resulting in frequent `rewind`. +- **Alert**: If `pulsar_cursor_epoch_increase_task_running` is not 0 for a long time, it indicates that there is a bug that will cause consumption stuck. + +# Backward & Forward Compatability + +Nothing should be cared for. + +# Links Review Comment: Foot notes perhaps? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
