codelipenghui commented on code in PR #23309: URL: https://github.com/apache/pulsar/pull/23309#discussion_r1767320207
########## pip/pip-379.md: ########## @@ -0,0 +1,115 @@ +# PIP-379: Key_Shared Draining Hashes for Improved Message Ordering + +## Background Knowledge + +Apache Pulsar's Key_Shared subscription mode is designed to provide ordered message delivery on a per-key basis while allowing multiple consumers to process messages concurrently. This mode is particularly useful in scenarios where maintaining message order for specific keys is crucial, but overall throughput can be improved by parallelizing message consumption across multiple consumers. + +Key concepts: +- **Key_Shared subscription**: A subscription mode that maintains message ordering per key while allowing multiple consumers. +- **Hash ranges**: In AUTO_SPLIT mode, the hash space is divided among active consumers to distribute message processing. +- **Pending messages**: Messages that have been sent to a consumer but not yet acknowledged (also called "pending acks" or "unacknowledged messages"). + +### Current contract of preserving ordering + +The Key_Shared subscription is described in the [Pulsar documentation](https://pulsar.apache.org/docs/concepts-messaging/#key_shared). + +For this PIP, the most important detail is the "Preserving order of processing" section. +There are recent changes in this section that apply to the master branch of Pulsar and, therefore, to the upcoming Pulsar 4.0. The changes were made as part of ["PIP-282: Change definition of the recently joined consumers position"](https://github.com/apache/pulsar/blob/master/pip/pip-282.md). + +[PIP-282 (master branch / Pulsar 4.0) version of the "Preserving order of processing" section](https://pulsar.apache.org/docs/next/concepts-messaging/#preserving-order-of-processing): + +> Key_Shared Subscription type guarantees a key will be processed by a *single* consumer at any given time. When a new consumer is connected, some keys will change their mapping from existing consumers to the new consumer. Once the connection has been established, the broker will record the current `lastSentPosition` and associate it with the new consumer. The `lastSentPosition` is a marker indicating that messages have been dispatched to the consumers up to this point. The broker will start delivering messages to the new consumer *only* when all messages up to the `lastSentPosition` have been acknowledged. This will guarantee that a certain key is processed by a single consumer at any given time. The trade-off is that if one of the existing consumers is stuck and no time-out was defined (acknowledging for you), the new consumer won't receive any messages until the stuck consumer resumes or gets disconnected. + +[Previous version (applies to Pulsar 3.x) of the "Preserving order of processing" section](https://pulsar.apache.org/docs/3.3.x/concepts-messaging/#preserving-order-of-processing): + +> Key Shared Subscription type guarantees a key will be processed by a *single* consumer at any given time. When a new consumer is connected, some keys will change their mapping from existing consumers to the new consumer. Once the connection has been established, the broker will record the current read position and associate it with the new consumer. The read position is a marker indicating that messages have been dispatched to the consumers up to this point, and after it, no messages have been dispatched yet. The broker will start delivering messages to the new consumer *only* when all messages up to the read position have been acknowledged. This will guarantee that a certain key is processed by a single consumer at any given time. The trade-off is that if one of the existing consumers is stuck and no time-out was defined (acknowledging for you), the new consumer won't receive any messages until the stuck consumer resumes or gets disconnected. + +## Motivation + +The current implementation of Key_Shared subscriptions faces several challenges: + +1. **Complex Contract of Preserving Ordering**: The current contract of preserving ordering is hard to understand and contains a fundamental problem. It explains a solution and then ties the guarantee to the provided solution. It could be interpreted that there's a guarantee as long as this solution is able to handle the case. +2. **Incomplete Ordering Contract Fulfillment**: The current contract seems to make a conditional guarantee that a certain key is processed by a single consumer at any given time. Outside of the described solution in the contract, the current implementation struggles to consistently prevent messages from being sent to another consumer while pending on the original consumer. While Key_Shared subscriptions aim to preserve message ordering per key, the current implementation may not always achieve this, especially during consumer changes. There's a potential corner case reported in [issue #23307](https://github.com/apache/pulsar/issues/23307). +3. **Usability Issues**: Understanding the current system and detecting the reason why messages get blocked is time-consuming and difficult. +4. **Unnecessary Message Blocking**: The current implementation blocks delivery for all messages when any hash range is blocked, even if other keys could be processed independently. This leads to suboptimal utilization of consumers and increased latency for messages that could otherwise be processed. +5. **Observability Challenges**: The current implementation lacks clear visibility into the consuming state when processing gets stuck, making it harder to build automation for detecting and mitigating issues. +6. **Complexity**: The existing solution for managing "recently joined consumers" is overly complex, making the system harder to maintain and debug. + +## Goals + +### In Scope + +- Clarify and fulfill the key-ordered message delivery contract for Key_Shared AUTO_SPLIT mode. +- Fix current issues where messages are sent out-of-order or when a single key is outstanding in multiple consumers at a time. +- Improve the handling of unacknowledged messages to prevent indefinite blocking and consumers getting stuck. +- Minimize memory usage for pending message tracking, eliminating PIP-282's "sent positions" tracking. +- Implement a new "draining hashes" concept to efficiently manage message ordering in Key_Shared subscriptions. +- Enhance the reliability, usability, and scalability of Key_Shared subscriptions. +- Improve observability of Key_Shared subscriptions to aid in troubleshooting and automation. +- Ensure strict ordering guarantees for messages with the same key, even during consumer changes. + +### Out of Scope + +- Changes to other subscription types (Exclusive, Failover, Shared). +- Adding support key based ordering guarantees when negative acknowledgements are used + +## High-Level Design + +### Updated contract of preserving ordering + +The "Preserving order of processing" section of the Key_Shared documentation would be updated to contain this contract: + +In Key_Shared subscriptions, messages with the same key are delivered and allowed to be in unacknowledged state to only one consumer at a time. + +When new consumers join or leave, the consumer handling a message key can change when the default AUTO_SPLIT mode is used, but only after all pending messages for a particular key are acknowledged or the original consumer disconnects. + +The Key_Shared subscription doesn't prevent using any methods in the consumer API. For example, the application might call `negativeAcknowledge` or the `redeliverUnacknowledgedMessages` method. When messages are scheduled for delivery due to these methods, they will get redelivered as soon as possible. There's no ordering guarantee in these cases, however the guarantee of delivering a message key to a single consumer at a time will continue to be preserved. + +### Future work in needed for supporting key-based ordering with negative acknowledgements + +The updated contract explicitly states that it is not possible to retain key-based ordering of messages when negative acknowledgements are used. Changing this is out of scope for PIP-379. A potential future solution for handling this would be to modify the client so that when a message is negatively acknowledged, it would also reject all further messages with the same key until the original message gets redelivered. It's already possible to attempt to implement this in client-side code. However, a proper solution would require support on the broker side to block further delivery of the specific key when there are pending negatively acknowledged messages until all negatively acknowledged messages for that particular key have been acknowledged by the consumer. This solution is out of scope for PIP-379. A future implementation to address these problems could build upon PIP-379 concepts such as "draining hashes" and extend that to cover the negative acknowledgement scenarios. + +### High-Level implementation plan + +The proposed solution introduces a "draining hashes" concept to efficiently manage message ordering in Key_Shared subscriptions: + +1. When consumer hash ranges change (e.g., a consumer joins or leaves), affected hashes of pending messages are added to a "draining hashes" set. +2. Following messages with hashes in the "draining hashes" set are blocked from further delivery until pending messages are processed. +3. A reference counter tracks pending messages for each hash in the "draining hashes" set. +4. As messages are acknowledged or consumers disconnect and therefore get removed from pending messages, the reference counter is decremented. +5. When the reference counter reaches zero, the hash is removed from the set, allowing new message delivery. +6. Consumer hash assignments may change multiple times, and a draining hash might be reassigned to the original consumer. The draining hash data structure contains information about the draining consumer. When a message is attempted for delivery, the system can check if the target consumer is the same as the draining consumer. If they match, there's no need to block the hash. The implementation should also remove such hashes from the draining hashes set. This "lazy" approach reduces the need for actively scanning all draining hashes whenever hash assignments change. + +This approach will meet the updated contract of preserving ordering while minimizing the impact on performance and memory usage. The tracking only comes into play during transition states. When consumers have been connected for a longer duration and all draining hashes have been removed, there won't be a need to check any special rules or maintain any extra state. + +## Public-facing Changes + +### Topic Stats Changes & Observability + +Topic stats for the removed PIP-282 "recently joined consumers"/"last sent position" solution are removed: +- `lastSentPositionWhenJoining` field for each consumer +- `consumersAfterMarkDeletePosition` field for each Key_Shared subscription Review Comment: This is a break change for the existing users for how they troubleshoot Key_Shared subscription issues today. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
