codelipenghui commented on code in PR #24400: URL: https://github.com/apache/pulsar/pull/24400#discussion_r2146544121
########## pip/pip-426.md: ########## @@ -0,0 +1,198 @@ + +# PIP-426: Enhanced Consumer Throttling and Unacknowledged Message Tracking for Exclusive and Failover Subscriptions + +# Background knowledge + +In [13618](https://github.com/apache/pulsar/pull/48), we have added a configuration named maxUnackedMessagesPerConsumer to restrict consumer for receiving messages without acknowledging-msg up to the threshold. + +# Motivation + +Apache Pulsar currently lacks full support for enforcing unacknowledged message limits and consumer-side flow control in exclusive and failover subscriptions. While these mechanisms function correctly for shared subscriptions, their absence in exclusive/failover modes causes critical limitations: + +1. **Throttling Inaccuracies** + Consumers can exceed configured `maxUnackedMessagesOnConsumer` limits, risking resource exhaustion. + +2. **Tracking Deficiencies** + `pendingAcks` tracking is disabled or partially functional, compromising visibility into message processing. + +3. **Inaccurate unacked message count** + Current implementation lacks support for: + - Cumulative acknowledgements + - Message batching + - Transactional operations + + +This proposal addresses these gaps by extending the `pendingAcks` system to support all subscription types. + +# Goals + +## In Scope + +1. **Strict Throttling Enforcement** + Apply `maxUnackedMessagesOnConsumer` limits to exclusive and failover subscriptions. + +2. **Unified Tracking Mechanism** + Enable `pendingAcks` for accurate unacknowledged message tracking across all subscription types. + +3. **Accurate unacked message count** + Support cumulative acknowledgements, message batching, and transactional operations. + +4. **Feature Flag** + Add a feature flag to enable/disable the new feature. + +## Out of Scope +- No guarantee that messages with the same key are delivered and allowed to be in unacknowledged state to only one consumer at a time in exclusive/failover subscriptions. +- No changes to the existing public API or wire protocol. +- No change in behavior for shared subscriptions. + +# High Level Design + +1. Remove the `Subscription.isIndividualAckMode()` restriction that limits `pendingAcks` usage to shared subscriptions. +2. Extend `PersistentAcknowledgmentsGroupingTracker` for exclusive/failover consumers. +3. Enhance flow control in `PersistentDispatcherSingleActiveConsumer` + +# Detailed Design + +## Design & Implementation Details Review Comment: Indeed, each message in a Pulsar topic can have a persistent, monotonically increasing local index, which functions much like a Kafka offset. This feature was officially introduced by Pull Request #9039, which implemented PIP-70 ("Introduce lightweight broker entry metadata"). With this local index, the number of unacknowledged messages for a consumer can be calculated with a simple subtraction: unacked_messages = last_delivered_index - mark_delete_position Given its reliability, considering its proven value, enabling this index by default in a future Apache Pulsar release would be reasonable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
