poorbarcode commented on code in PR #20776:
URL: https://github.com/apache/pulsar/pull/20776#discussion_r1264939773


##########
pip/pip-282.md:
##########
@@ -0,0 +1,283 @@
+# Background knowledge
+
+Key_Shared is one of the subscription types which allows multiple consumer 
connections.
+Messages are distributed across consumers, and messages with the same key or 
same ordering key are delivered to only one consumer.
+No matter how many times the message is re-delivered, it is delivered to the 
same consumer.
+
+When disabling `allowOutOfOrderDelivery`, Key_Shared subscription guarantees a 
key will be processed in order by a single consumer, even if a new consumer is 
connected.
+
+# Motivation
+
+Key_Shared has a mechanism called the "recently joined consumers" to keep 
message ordering.
+However, currently, it doesn't care about some corner cases.
+More specifically, we found two out-of-order issues cased by:
+
+1. [issue-1] The race condition in the "recently joined consumers", where 
consumers can be added before finishing reading and dispatching messages from 
ledgers.
+2. [issue-2] Messages could be added to messagesToRedeliver without 
consumer-side operations such as unacknowledgement.
+
+We should care about these cases in Key_Shared subscription.
+
+## [issue-1]
+
+Key_Shared subscription has out-of-order cases because of the race condition 
of [the "recently joined 
consumers"](https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L378-L386).
  
+Consider the following flow.
+
+1. Assume that the current read position is `1:6` and the recently joined 
consumers is empty.
+2. Called 
[OpReadEntry#internalReadEntriesComplete](https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java#L92-L95)
 from thread-1.  
+   Then, the current read position is updated to `1:12` (Messages from `1:6` 
to `1:11` have yet to be dispatched to consumers).
+3. Called 
[PersistentStickyKeyDispatcherMultipleConsumers#addConsumer](https://github.com/apache/pulsar/blob/35e9897742b7db4bd29349940075a819b2ad6999/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L130-L139)
 from thread-2.  
+   Then, the new consumer is stored to the recently joined consumers with read 
position `1:12`.
+4. Called 
[PersistentDispatcherMultipleConsumers#trySendMessagesToConsumers](https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L169)
 from thread-5.  
+   Then, messages from `1:6` to `1:11` can be dispatched to the new consumer 
since the "recently joined consumers" allow brokers to send messages before the 
joined position (i.e., `1:12` here). **However, it is not expected.**  
+   For example, if existing consumers have some unacked messages, 
disconnecting, and redelivering them can cause out-of-order.
+
+An example scenario is shown below.
+
+1. Assume that the 
[entries](https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L169)
 has the following messages, and the dispatcher has two consumers (`c1` 
`messagesForC` is 1, `c2` `messageForC` is 1000), and the selector will return 
`c1` if `key-a` and `c2` if `key-b`.
+   - `1:6`  key: `key-a`
+   - `1:7`  key: `key-a`
+   - `1:8`  key: `key-a`
+   - `1:9`  key: `key-b`
+   - `1:10` key: `key-b`
+   - `1:11` key: `key-b`
+2. Send `1:6` to `c1` and `1:9` - `1:11` to `c2`.
+   - So, the current read position is `1:12`.
+   - `c1` never acknowledge `1:6`.
+3. Add new consumer `c3`, the selector will return `c3` if `key-a`, and the 
`recentlyJoinedConsumers` is `{c3=1:12}`
+4. Send `1:7` - `1:8` to `c3` because `1:7`, and `1:8` are less than the 
recently joined consumers position, `1:12`.
+5. Disconnect `c1`.
+6. Send `1:6` to `c3`.  
+   As a result `c3` receives messages with the following order: `1:7`, `1:8`, 
`1:6` // out-of-order
+
+## [issue-2]
+Key_Shared subscription has out-of-order cases because messages could be added 
to messagesToRedeliver without consumer-side operations such as 
unacknowledgement.  
+Consider the following flow.
+
+1. Assume that,  
+   readPosition: `2:1`  
+   messagesToRedeliver: []  
+   recentlyJoinedConsumers: []  
+   c1: messagesForC: 1, pending: []  
+   c2: messagesForC: 1000, pending: []  // Necessary to ensure that the 
dispatcher reads entries even if c1 has no more permits.  
+   selector: key-a: c1
+2. Dispatch `2:1` (key: `key-a`, type: Normal)  
+   readPosition: `2:2`  
+   messagesToRedeliver: []  
+   recentlyJoinedConsumers: []  
+   c1: messagesForC: 0, pending: [`2:1`]  
+   c2: messagesForC: 1000, pending: []  
+   selector: key-a: c1
+3. Try to dispatch `2:2` (key: `key-a`, type: Normal), but it can't be sent to 
c1 because c1 has no more permits. Then, it is added to messagesToRedeliver.  
+   readPosition: `2:3`  
+   messagesToRedeliver: [`2:2`]  
+   recentlyJoinedConsumers: []  
+   c1: messagesForC: 0, pending: [`2:1`]  
+   c2: messagesForC: 1000, pending: []  
+   selector: key-a: c1
+4. Add consumer c3  
+   readPosition: `2:3`  
+   messagesToRedeliver: [`2:2`]  
+   recentlyJoinedConsumers: [c3: `2:3`]  
+   c1: messagesForC: 0, pending: [`2:1`]  
+   c2: messagesForC: 1000, pending: []  
+   c3: messagesForC: 1000, pending: []  
+   selector: key-a: c3  // modified
+5. Dispatch `2:2` (key: `key-a`, type: Replay) from messagesToRedeliver.  
+   readPosition: `2:3`  
+   messagesToRedeliver: []  
+   recentlyJoinedConsumers: [c3: `2:3`]  
+   c1: messagesForC: 0, pending: [`2:1`]  
+   c2: messagesForC: 1000, pending: []  
+   c3: messagesForC: 999, pending: [`2:2`]  
+   selector: key-a: c3
+6. Disconnect c1 and redelivery `2:1`  
+   readPosition: `2:3`  
+   messagesToRedeliver: []  
+   recentlyJoinedConsumers: [c3: `2:3`]  
+   c2: messagesForC: 1000, pending: []  
+   c3: messagesForC: 998, pending: [`2:2`, `2:1`]  // out-of-order  
+   selector: key-a: c3
+
+# Goals
+
+## In Scope
+
+Fix out-of-order issues above.
+
+## Out of Scope
+
+Simplify or improve the specification of Key_Shared.
+
+# High Level Design
+
+The root cause of the issues described above is that `recentlyJoinedConsumers` 
uses "read position" as joined positions for consumers, because this does not 
guarantee that messages less than or equal to it have already been scheduled to 
be sent.
+Instead, we propose to use "last sent position" as joined positions for 
consumers.
+
+Also, change (or add) some stats to know Key_Shared subscription status easily.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+First, introduce the new position, like the mark delete position and the 
individually deleted messages. In other words,
+
+- All positions less than or equal to it are already scheduled to be sent.
+- Manage individually sent positions to update the position as expected.
+
+An example of updating the individually sent messages:  
+https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2250-L2287

Review Comment:
   The code in this link seems does not match what is being expressed.



##########
pip/pip-282.md:
##########
@@ -0,0 +1,283 @@
+# Background knowledge

Review Comment:
   Can this design avoid the following scenarios? If so, describe how to avoid 
it.
   
   **context**
   - there has 2 consumer: `c1`, `c2`
   - there has 3 messages in the backlog: `m1(k1)`, `m2(k2)`, `m3(k3)`
   
   **flow-1**
   | time | `add consumer c3` | `delivery messages to client` |
   | --- | --- | --- |
   | 1 | add consumer to the selector |
   | 2 | | route messages: `m1(k1) -> c1`, `m2(k2) -> c2`, `m3(k3) -> c3`
   | 3 | | the consumer `c3` was not included in the collection 
`recentJoinedConsumers`, so move `m3` into the Relay Queue | 
   | 4 | mark the recent join position `m3` for the consumer `c3` |
   
   Then the delivery of messages would be stuck.
   
   
   
   
   



##########
pip/pip-282.md:
##########
@@ -0,0 +1,283 @@
+# Background knowledge
+
+Key_Shared is one of the subscription types which allows multiple consumer 
connections.
+Messages are distributed across consumers, and messages with the same key or 
same ordering key are delivered to only one consumer.
+No matter how many times the message is re-delivered, it is delivered to the 
same consumer.
+
+When disabling `allowOutOfOrderDelivery`, Key_Shared subscription guarantees a 
key will be processed in order by a single consumer, even if a new consumer is 
connected.
+
+# Motivation
+
+Key_Shared has a mechanism called the "recently joined consumers" to keep 
message ordering.
+However, currently, it doesn't care about some corner cases.
+More specifically, we found two out-of-order issues cased by:
+
+1. [issue-1] The race condition in the "recently joined consumers", where 
consumers can be added before finishing reading and dispatching messages from 
ledgers.
+2. [issue-2] Messages could be added to messagesToRedeliver without 
consumer-side operations such as unacknowledgement.
+
+We should care about these cases in Key_Shared subscription.
+
+## [issue-1]
+
+Key_Shared subscription has out-of-order cases because of the race condition 
of [the "recently joined 
consumers"](https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L378-L386).
  
+Consider the following flow.
+
+1. Assume that the current read position is `1:6` and the recently joined 
consumers is empty.
+2. Called 
[OpReadEntry#internalReadEntriesComplete](https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java#L92-L95)
 from thread-1.  
+   Then, the current read position is updated to `1:12` (Messages from `1:6` 
to `1:11` have yet to be dispatched to consumers).
+3. Called 
[PersistentStickyKeyDispatcherMultipleConsumers#addConsumer](https://github.com/apache/pulsar/blob/35e9897742b7db4bd29349940075a819b2ad6999/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L130-L139)
 from thread-2.  
+   Then, the new consumer is stored to the recently joined consumers with read 
position `1:12`.
+4. Called 
[PersistentDispatcherMultipleConsumers#trySendMessagesToConsumers](https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L169)
 from thread-5.  
+   Then, messages from `1:6` to `1:11` can be dispatched to the new consumer 
since the "recently joined consumers" allow brokers to send messages before the 
joined position (i.e., `1:12` here). **However, it is not expected.**  
+   For example, if existing consumers have some unacked messages, 
disconnecting, and redelivering them can cause out-of-order.
+
+An example scenario is shown below.
+
+1. Assume that the 
[entries](https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L169)
 has the following messages, and the dispatcher has two consumers (`c1` 
`messagesForC` is 1, `c2` `messageForC` is 1000), and the selector will return 
`c1` if `key-a` and `c2` if `key-b`.
+   - `1:6`  key: `key-a`
+   - `1:7`  key: `key-a`
+   - `1:8`  key: `key-a`
+   - `1:9`  key: `key-b`
+   - `1:10` key: `key-b`
+   - `1:11` key: `key-b`
+2. Send `1:6` to `c1` and `1:9` - `1:11` to `c2`.
+   - So, the current read position is `1:12`.
+   - `c1` never acknowledge `1:6`.
+3. Add new consumer `c3`, the selector will return `c3` if `key-a`, and the 
`recentlyJoinedConsumers` is `{c3=1:12}`
+4. Send `1:7` - `1:8` to `c3` because `1:7`, and `1:8` are less than the 
recently joined consumers position, `1:12`.
+5. Disconnect `c1`.
+6. Send `1:6` to `c3`.  
+   As a result `c3` receives messages with the following order: `1:7`, `1:8`, 
`1:6` // out-of-order
+
+## [issue-2]
+Key_Shared subscription has out-of-order cases because messages could be added 
to messagesToRedeliver without consumer-side operations such as 
unacknowledgement.  
+Consider the following flow.
+
+1. Assume that,  
+   readPosition: `2:1`  
+   messagesToRedeliver: []  
+   recentlyJoinedConsumers: []  
+   c1: messagesForC: 1, pending: []  
+   c2: messagesForC: 1000, pending: []  // Necessary to ensure that the 
dispatcher reads entries even if c1 has no more permits.  
+   selector: key-a: c1
+2. Dispatch `2:1` (key: `key-a`, type: Normal)  
+   readPosition: `2:2`  
+   messagesToRedeliver: []  
+   recentlyJoinedConsumers: []  
+   c1: messagesForC: 0, pending: [`2:1`]  
+   c2: messagesForC: 1000, pending: []  
+   selector: key-a: c1
+3. Try to dispatch `2:2` (key: `key-a`, type: Normal), but it can't be sent to 
c1 because c1 has no more permits. Then, it is added to messagesToRedeliver.  
+   readPosition: `2:3`  
+   messagesToRedeliver: [`2:2`]  
+   recentlyJoinedConsumers: []  
+   c1: messagesForC: 0, pending: [`2:1`]  
+   c2: messagesForC: 1000, pending: []  
+   selector: key-a: c1
+4. Add consumer c3  
+   readPosition: `2:3`  
+   messagesToRedeliver: [`2:2`]  
+   recentlyJoinedConsumers: [c3: `2:3`]  
+   c1: messagesForC: 0, pending: [`2:1`]  
+   c2: messagesForC: 1000, pending: []  
+   c3: messagesForC: 1000, pending: []  
+   selector: key-a: c3  // modified
+5. Dispatch `2:2` (key: `key-a`, type: Replay) from messagesToRedeliver.  
+   readPosition: `2:3`  
+   messagesToRedeliver: []  
+   recentlyJoinedConsumers: [c3: `2:3`]  
+   c1: messagesForC: 0, pending: [`2:1`]  
+   c2: messagesForC: 1000, pending: []  
+   c3: messagesForC: 999, pending: [`2:2`]  
+   selector: key-a: c3
+6. Disconnect c1 and redelivery `2:1`  
+   readPosition: `2:3`  
+   messagesToRedeliver: []  
+   recentlyJoinedConsumers: [c3: `2:3`]  
+   c2: messagesForC: 1000, pending: []  
+   c3: messagesForC: 998, pending: [`2:2`, `2:1`]  // out-of-order  
+   selector: key-a: c3
+
+# Goals
+
+## In Scope
+
+Fix out-of-order issues above.
+
+## Out of Scope
+
+Simplify or improve the specification of Key_Shared.
+
+# High Level Design
+
+The root cause of the issues described above is that `recentlyJoinedConsumers` 
uses "read position" as joined positions for consumers, because this does not 
guarantee that messages less than or equal to it have already been scheduled to 
be sent.
+Instead, we propose to use "last sent position" as joined positions for 
consumers.
+
+Also, change (or add) some stats to know Key_Shared subscription status easily.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+First, introduce the new position, like the mark delete position and the 
individually deleted messages. In other words,
+
+- All positions less than or equal to it are already scheduled to be sent.
+- Manage individually sent positions to update the position as expected.
+
+An example of updating the individually sent messages:  
+https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2250-L2287
+
+An example of updating the last sent position:  
+https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2311-L2321
+
+More specifically, the recently joined consumers will be as follows.
+```diff
+diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+index 8f05530f58b..c96dd50d673 100644
+--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
++++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+@@ -69,8 +69,13 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
+      * This means that, in order to preserve ordering, new consumers can only 
receive old
+      * messages, until the mark-delete position will move past this point.
+      */
++    // Map(key: recently joined consumer, value: last sent position when 
joining)
+     private final LinkedHashMap<Consumer, PositionImpl> 
recentlyJoinedConsumers;
+
++    // snapshot of the last sent position
++    private PositionImpl lastSentPosition;
++    private final RangeSetWrapper<PositionImpl> individuallySentPositions;

Review Comment:
   What is this for?



##########
pip/pip-282.md:
##########
@@ -0,0 +1,283 @@
+# Background knowledge
+
+Key_Shared is one of the subscription types which allows multiple consumer 
connections.
+Messages are distributed across consumers, and messages with the same key or 
same ordering key are delivered to only one consumer.
+No matter how many times the message is re-delivered, it is delivered to the 
same consumer.
+
+When disabling `allowOutOfOrderDelivery`, Key_Shared subscription guarantees a 
key will be processed in order by a single consumer, even if a new consumer is 
connected.
+
+# Motivation
+
+Key_Shared has a mechanism called the "recently joined consumers" to keep 
message ordering.
+However, currently, it doesn't care about some corner cases.
+More specifically, we found two out-of-order issues cased by:
+
+1. [issue-1] The race condition in the "recently joined consumers", where 
consumers can be added before finishing reading and dispatching messages from 
ledgers.
+2. [issue-2] Messages could be added to messagesToRedeliver without 
consumer-side operations such as unacknowledgement.
+
+We should care about these cases in Key_Shared subscription.
+
+## [issue-1]
+
+Key_Shared subscription has out-of-order cases because of the race condition 
of [the "recently joined 
consumers"](https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L378-L386).
  
+Consider the following flow.
+
+1. Assume that the current read position is `1:6` and the recently joined 
consumers is empty.
+2. Called 
[OpReadEntry#internalReadEntriesComplete](https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java#L92-L95)
 from thread-1.  
+   Then, the current read position is updated to `1:12` (Messages from `1:6` 
to `1:11` have yet to be dispatched to consumers).
+3. Called 
[PersistentStickyKeyDispatcherMultipleConsumers#addConsumer](https://github.com/apache/pulsar/blob/35e9897742b7db4bd29349940075a819b2ad6999/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L130-L139)
 from thread-2.  
+   Then, the new consumer is stored to the recently joined consumers with read 
position `1:12`.
+4. Called 
[PersistentDispatcherMultipleConsumers#trySendMessagesToConsumers](https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L169)
 from thread-5.  
+   Then, messages from `1:6` to `1:11` can be dispatched to the new consumer 
since the "recently joined consumers" allow brokers to send messages before the 
joined position (i.e., `1:12` here). **However, it is not expected.**  
+   For example, if existing consumers have some unacked messages, 
disconnecting, and redelivering them can cause out-of-order.
+
+An example scenario is shown below.
+
+1. Assume that the 
[entries](https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L169)
 has the following messages, and the dispatcher has two consumers (`c1` 
`messagesForC` is 1, `c2` `messageForC` is 1000), and the selector will return 
`c1` if `key-a` and `c2` if `key-b`.
+   - `1:6`  key: `key-a`
+   - `1:7`  key: `key-a`
+   - `1:8`  key: `key-a`
+   - `1:9`  key: `key-b`
+   - `1:10` key: `key-b`
+   - `1:11` key: `key-b`
+2. Send `1:6` to `c1` and `1:9` - `1:11` to `c2`.
+   - So, the current read position is `1:12`.
+   - `c1` never acknowledge `1:6`.
+3. Add new consumer `c3`, the selector will return `c3` if `key-a`, and the 
`recentlyJoinedConsumers` is `{c3=1:12}`
+4. Send `1:7` - `1:8` to `c3` because `1:7`, and `1:8` are less than the 
recently joined consumers position, `1:12`.
+5. Disconnect `c1`.
+6. Send `1:6` to `c3`.  
+   As a result `c3` receives messages with the following order: `1:7`, `1:8`, 
`1:6` // out-of-order
+
+## [issue-2]
+Key_Shared subscription has out-of-order cases because messages could be added 
to messagesToRedeliver without consumer-side operations such as 
unacknowledgement.  
+Consider the following flow.
+
+1. Assume that,  
+   readPosition: `2:1`  
+   messagesToRedeliver: []  
+   recentlyJoinedConsumers: []  
+   c1: messagesForC: 1, pending: []  
+   c2: messagesForC: 1000, pending: []  // Necessary to ensure that the 
dispatcher reads entries even if c1 has no more permits.  
+   selector: key-a: c1
+2. Dispatch `2:1` (key: `key-a`, type: Normal)  
+   readPosition: `2:2`  
+   messagesToRedeliver: []  
+   recentlyJoinedConsumers: []  
+   c1: messagesForC: 0, pending: [`2:1`]  
+   c2: messagesForC: 1000, pending: []  
+   selector: key-a: c1
+3. Try to dispatch `2:2` (key: `key-a`, type: Normal), but it can't be sent to 
c1 because c1 has no more permits. Then, it is added to messagesToRedeliver.  
+   readPosition: `2:3`  
+   messagesToRedeliver: [`2:2`]  
+   recentlyJoinedConsumers: []  
+   c1: messagesForC: 0, pending: [`2:1`]  
+   c2: messagesForC: 1000, pending: []  
+   selector: key-a: c1
+4. Add consumer c3  
+   readPosition: `2:3`  
+   messagesToRedeliver: [`2:2`]  
+   recentlyJoinedConsumers: [c3: `2:3`]  
+   c1: messagesForC: 0, pending: [`2:1`]  
+   c2: messagesForC: 1000, pending: []  
+   c3: messagesForC: 1000, pending: []  
+   selector: key-a: c3  // modified
+5. Dispatch `2:2` (key: `key-a`, type: Replay) from messagesToRedeliver.  
+   readPosition: `2:3`  
+   messagesToRedeliver: []  
+   recentlyJoinedConsumers: [c3: `2:3`]  
+   c1: messagesForC: 0, pending: [`2:1`]  
+   c2: messagesForC: 1000, pending: []  
+   c3: messagesForC: 999, pending: [`2:2`]  
+   selector: key-a: c3
+6. Disconnect c1 and redelivery `2:1`  
+   readPosition: `2:3`  
+   messagesToRedeliver: []  
+   recentlyJoinedConsumers: [c3: `2:3`]  
+   c2: messagesForC: 1000, pending: []  
+   c3: messagesForC: 998, pending: [`2:2`, `2:1`]  // out-of-order  
+   selector: key-a: c3
+
+# Goals
+
+## In Scope
+
+Fix out-of-order issues above.
+
+## Out of Scope
+
+Simplify or improve the specification of Key_Shared.
+
+# High Level Design
+
+The root cause of the issues described above is that `recentlyJoinedConsumers` 
uses "read position" as joined positions for consumers, because this does not 
guarantee that messages less than or equal to it have already been scheduled to 
be sent.
+Instead, we propose to use "last sent position" as joined positions for 
consumers.
+
+Also, change (or add) some stats to know Key_Shared subscription status easily.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+First, introduce the new position, like the mark delete position and the 
individually deleted messages. In other words,
+
+- All positions less than or equal to it are already scheduled to be sent.
+- Manage individually sent positions to update the position as expected.
+
+An example of updating the individually sent messages:  
+https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2250-L2287
+
+An example of updating the last sent position:  
+https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2311-L2321

Review Comment:
   The code in this link seems does not match what is being expressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to