codelipenghui commented on code in PR #23309:
URL: https://github.com/apache/pulsar/pull/23309#discussion_r1767372924


##########
pip/pip-379.md:
##########
@@ -0,0 +1,115 @@
+# PIP-379: Key_Shared Draining Hashes for Improved Message Ordering
+
+## Background Knowledge
+
+Apache Pulsar's Key_Shared subscription mode is designed to provide ordered 
message delivery on a per-key basis while allowing multiple consumers to 
process messages concurrently. This mode is particularly useful in scenarios 
where maintaining message order for specific keys is crucial, but overall 
throughput can be improved by parallelizing message consumption across multiple 
consumers.
+
+Key concepts:
+- **Key_Shared subscription**: A subscription mode that maintains message 
ordering per key while allowing multiple consumers.
+- **Hash ranges**: In AUTO_SPLIT mode, the hash space is divided among active 
consumers to distribute message processing.
+- **Pending messages**: Messages that have been sent to a consumer but not yet 
acknowledged (also called "pending acks" or "unacknowledged messages").
+
+### Current contract of preserving ordering
+
+The Key_Shared subscription is described in the [Pulsar 
documentation](https://pulsar.apache.org/docs/concepts-messaging/#key_shared).
+
+For this PIP, the most important detail is the "Preserving order of 
processing" section.
+There are recent changes in this section that apply to the master branch of 
Pulsar and, therefore, to the upcoming Pulsar 4.0. The changes were made as 
part of ["PIP-282: Change definition of the recently joined consumers 
position"](https://github.com/apache/pulsar/blob/master/pip/pip-282.md).
+
+[PIP-282 (master branch / Pulsar 4.0) version of the "Preserving order of 
processing" 
section](https://pulsar.apache.org/docs/next/concepts-messaging/#preserving-order-of-processing):
+
+> Key_Shared Subscription type guarantees a key will be processed by a 
*single* consumer at any given time. When a new consumer is connected, some 
keys will change their mapping from existing consumers to the new consumer. 
Once the connection has been established, the broker will record the current 
`lastSentPosition` and associate it with the new consumer. The 
`lastSentPosition` is a marker indicating that messages have been dispatched to 
the consumers up to this point. The broker will start delivering messages to 
the new consumer *only* when all messages up to the `lastSentPosition` have 
been acknowledged. This will guarantee that a certain key is processed by a 
single consumer at any given time. The trade-off is that if one of the existing 
consumers is stuck and no time-out was defined (acknowledging for you), the new 
consumer won't receive any messages until the stuck consumer resumes or gets 
disconnected.
+
+[Previous version (applies to Pulsar 3.x) of the "Preserving order of 
processing" 
section](https://pulsar.apache.org/docs/3.3.x/concepts-messaging/#preserving-order-of-processing):
+
+> Key Shared Subscription type guarantees a key will be processed by a 
*single* consumer at any given time. When a new consumer is connected, some 
keys will change their mapping from existing consumers to the new consumer. 
Once the connection has been established, the broker will record the current 
read position and associate it with the new consumer. The read position is a 
marker indicating that messages have been dispatched to the consumers up to 
this point, and after it, no messages have been dispatched yet. The broker will 
start delivering messages to the new consumer *only* when all messages up to 
the read position have been acknowledged. This will guarantee that a certain 
key is processed by a single consumer at any given time. The trade-off is that 
if one of the existing consumers is stuck and no time-out was defined 
(acknowledging for you), the new consumer won't receive any messages until the 
stuck consumer resumes or gets disconnected.
+
+## Motivation
+
+The current implementation of Key_Shared subscriptions faces several 
challenges:
+
+1. **Complex Contract of Preserving Ordering**: The current contract of 
preserving ordering is hard to understand and contains a fundamental problem. 
It explains a solution and then ties the guarantee to the provided solution. It 
could be interpreted that there's a guarantee as long as this solution is able 
to handle the case.
+2. **Incomplete Ordering Contract Fulfillment**: The current contract seems to 
make a conditional guarantee that a certain key is processed by a single 
consumer at any given time. Outside of the described solution in the contract, 
the current implementation struggles to consistently prevent messages from 
being sent to another consumer while pending on the original consumer. While 
Key_Shared subscriptions aim to preserve message ordering per key, the current 
implementation may not always achieve this, especially during consumer changes. 
There's a potential corner case reported in [issue 
#23307](https://github.com/apache/pulsar/issues/23307).
+3. **Usability Issues**: Understanding the current system and detecting the 
reason why messages get blocked is time-consuming and difficult.
+4. **Unnecessary Message Blocking**: The current implementation blocks 
delivery for all messages when any hash range is blocked, even if other keys 
could be processed independently. This leads to suboptimal utilization of 
consumers and increased latency for messages that could otherwise be processed.
+5. **Observability Challenges**: The current implementation lacks clear 
visibility into the consuming state when processing gets stuck, making it 
harder to build automation for detecting and mitigating issues.
+6. **Complexity**: The existing solution for managing "recently joined 
consumers" is overly complex, making the system harder to maintain and debug.
+
+## Goals
+
+### In Scope
+
+- Clarify and fulfill the key-ordered message delivery contract for Key_Shared 
AUTO_SPLIT mode.
+- Fix current issues where messages are sent out-of-order or when a single key 
is outstanding in multiple consumers at a time.
+- Improve the handling of unacknowledged messages to prevent indefinite 
blocking and consumers getting stuck.
+- Minimize memory usage for pending message tracking, eliminating PIP-282's 
"sent positions" tracking.
+- Implement a new "draining hashes" concept to efficiently manage message 
ordering in Key_Shared subscriptions.
+- Enhance the reliability, usability, and scalability of Key_Shared 
subscriptions.
+- Improve observability of Key_Shared subscriptions to aid in troubleshooting 
and automation.
+- Ensure strict ordering guarantees for messages with the same key, even 
during consumer changes.
+
+### Out of Scope
+
+- Changes to other subscription types (Exclusive, Failover, Shared).
+- Adding support key based ordering guarantees when negative acknowledgements 
are used
+
+## High-Level Design
+
+### Updated contract of preserving ordering
+
+The "Preserving order of processing" section of the Key_Shared documentation 
would be updated to contain this contract:
+
+In Key_Shared subscriptions, messages with the same key are delivered and 
allowed to be in unacknowledged state to only one consumer at a time.
+
+When new consumers join or leave, the consumer handling a message key can 
change when the default AUTO_SPLIT mode is used, but only after all pending 
messages for a particular key are acknowledged or the original consumer 
disconnects.
+
+The Key_Shared subscription doesn't prevent using any methods in the consumer 
API. For example, the application might call `negativeAcknowledge` or the 
`redeliverUnacknowledgedMessages` method. When messages are scheduled for 
delivery due to these methods, they will get redelivered as soon as possible. 
There's no ordering guarantee in these cases, however the guarantee of 
delivering a message key to a single consumer at a time will continue to be 
preserved.
+
+### Future work in needed for supporting key-based ordering with negative 
acknowledgements
+
+The updated contract explicitly states that it is not possible to retain 
key-based ordering of messages when negative acknowledgements are used. 
Changing this is out of scope for PIP-379. A potential future solution for 
handling this would be to modify the client so that when a message is 
negatively acknowledged, it would also reject all further messages with the 
same key until the original message gets redelivered. It's already possible to 
attempt to implement this in client-side code. However, a proper solution would 
require support on the broker side to block further delivery of the specific 
key when there are pending negatively acknowledged messages until all 
negatively acknowledged messages for that particular key have been acknowledged 
by the consumer. This solution is out of scope for PIP-379. A future 
implementation to address these problems could build upon PIP-379 concepts such 
as "draining hashes" and extend that to cover the negative acknowledgement 
scenarios.
+
+### High-Level implementation plan
+
+The proposed solution introduces a "draining hashes" concept to efficiently 
manage message ordering in Key_Shared subscriptions:
+
+1. When consumer hash ranges change (e.g., a consumer joins or leaves), 
affected hashes of pending messages are added to a "draining hashes" set.
+2. Following messages with hashes in the "draining hashes" set are blocked 
from further delivery until pending messages are processed.
+3. A reference counter tracks pending messages for each hash in the "draining 
hashes" set.
+4. As messages are acknowledged or consumers disconnect and therefore get 
removed from pending messages, the reference counter is decremented.
+5. When the reference counter reaches zero, the hash is removed from the set, 
allowing new message delivery.
+6. Consumer hash assignments may change multiple times, and a draining hash 
might be reassigned to the original consumer. The draining hash data structure 
contains information about the draining consumer. When a message is attempted 
for delivery, the system can check if the target consumer is the same as the 
draining consumer. If they match, there's no need to block the hash. The 
implementation should also remove such hashes from the draining hashes set. 
This "lazy" approach reduces the need for actively scanning all draining hashes 
whenever hash assignments change.
+
+This approach will meet the updated contract of preserving ordering while 
minimizing the impact on performance and memory usage. The tracking only comes 
into play during transition states. When consumers have been connected for a 
longer duration and all draining hashes have been removed, there won't be a 
need to check any special rules or maintain any extra state.
+
+## Public-facing Changes
+
+### Topic Stats Changes & Observability
+
+Topic stats for the removed PIP-282 "recently joined consumers"/"last sent 
position" solution are removed:
+- `lastSentPositionWhenJoining` field for each consumer
+- `consumersAfterMarkDeletePosition` field for each Key_Shared subscription

Review Comment:
   That is a break change, we changed the REST API without any smooth 
transition. The field `consumersAfterMarkDeletePosition` will be delete, some 
users might build metrics or something on this field. We need to keep the 
default behavior consistent for all the APIs exposed to users. We can add more 
fields, but should not remove the existing one or change the meaning of the 
existing one.
   
   3.0 to 4.0 should not have break change, we promised user that they can 
upgrade from 3.0 to 4.0 directly without any compatibility issue. 
https://pulsar.apache.org/contribute/release-policy/#compatibility-between-releases.
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to