lhotari opened a new pull request, #23429: URL: https://github.com/apache/pulsar/pull/23429
### Motivation [PIP-379: Key_Shared Draining Hashes for Improved Message Ordering](https://github.com/apache/pulsar/blob/master/pip/pip-379.md) was implemented in #23352. One of the major benefits of PIP-379 is the easy-to-understand model of when a hash is blocked. When a new consumer is added, hash range assignments move from existing consumers to the new consumer. (In some cases, hash range assignments can move between existing consumers after a consumer is added or removed.) The PIP-379 implementation ensures that no new messages for the hash ranges that were moved can be delivered until all unacknowledged messages for a specific hash are cleared with acknowledgements or when the consumer disconnects. This applies to the AUTO_SPLIT ordered mode of the Key_Shared subscription type. There's a concept of "draining hashes" in PIP-379 which is now reflected in the consumer stats. This is an intentionally exposed internal detail since the user must have the information available for understanding why messages don't get delivered. Since there's no mapping between external and internal concepts, the abstraction isn't leaky. The user doesn't need to know about the internal details of how the draining hashes are implemented, but they need to know that the consumer is blocked on unacknowledged messages for a specific hash range. This is all relevant information and doesn't contain unnecessary implementation details. This PR contains the "consumer stats" changes that provide the information in a clear way. ### Modifications Added consumer-level stats: - `drainingHashesCount` - the current number of hashes in the draining state for this consumer - `drainingHashesClearedTotal` - the total number of hashes cleared from the draining state since the consumer connected - `drainingHashesUnackedMessages` - the total number of unacknowledged messages for all draining hashes for this consumer - `drainingHashes` - draining hashes information for this consumer - `hash` - the sticky key hash which is draining - `unackMsgs` - the number of unacknowledged messages for this hash - `blockedAttempts` - the number of times the hash has blocked an attempted delivery of a message In addition: - `keyHashRangeArrays` - the consumer's hash range assignments in a list of lists where each item contains the start and end as elements. - example `[ [ 2960, 5968 ], [ 22258, 43033 ], [ 49261, 54464 ], [ 55155, 61273 ] ]` It was necessary to add this field with a new name `keyHashRangeArrays` since there's already an existing `keyHashRange` field. Changing that isn't possible since it would break compatibility. A newer admin client couldn't read stats from an older broker and vice-versa. The previous `keyHashRange` is now deprecated. The field format was different. Example of both fields where the difference is visible: ```json { "keyHashRangeArrays" : [ [ 2960, 5968 ], [ 22258, 43033 ], [ 49261, 54464 ], [ 55155, 61273 ] ], "keyHashRanges" : [ "[2960, 5968]", "[22258, 43033]", "[49261, 54464]", "[55155, 61273]" ], } ``` The field `keyHashRanges` contains the information as a list of string values, which isn't very usable for most use cases since it would need to be parsed before it can be used. The stats will continue to contain `keyHashRange` and `readPositionWhenJoining` when the "classic" (3.3.x) implementation of Key_Shared is used by configuring `subscriptionKeySharedUseClassicPersistentImplementation=true` ("classic" support was added in #23424). In the default configuration, the fields are removed from the topic stats output, but the client continues to support the fields for backward and forward compatibility. ### Example of consumer stats for a subscription ```json { "consumers" : [ { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "bytesOutCounter" : 1560, "msgOutCounter" : 30, "msgRateRedeliver" : 0.0, "messageAckRate" : 0.0, "chunkedMessageRate" : 0.0, "consumerName" : "c1", "availablePermits" : 70, "unackedMessages" : 30, "avgMessagesPerEntry" : 1, "blockedConsumerOnUnackedMsgs" : false, "drainingHashesCount" : 5, "drainingHashesClearedTotal" : 0, "drainingHashesUnackedMessages" : 10, "drainingHashes" : [ { "hash" : 2862, "unackMsgs" : 2, "blockedAttempts" : 5 }, { "hash" : 11707, "unackMsgs" : 2, "blockedAttempts" : 9 }, { "hash" : 15786, "unackMsgs" : 2, "blockedAttempts" : 6 }, { "hash" : 43539, "unackMsgs" : 2, "blockedAttempts" : 6 }, { "hash" : 45436, "unackMsgs" : 2, "blockedAttempts" : 9 } ], "address" : "/127.0.0.1:55829", "connectedSince" : "2024-10-10T05:39:39.077284+03:00", "clientVersion" : "Pulsar-Java-v4.0.0-SNAPSHOT", "lastAckedTimestamp" : 0, "lastConsumedTimestamp" : 1728527979411, "lastConsumedFlowTimestamp" : 1728527979106, "keyHashRangeArrays" : [ [ 2960, 5968 ], [ 22258, 43033 ], [ 49261, 54464 ], [ 55155, 61273 ] ], "metadata" : { }, "lastAckedTime" : "1970-01-01T02:00:00+02:00", "lastConsumedTime" : "2024-10-10T05:39:39.411+03:00" }, { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "bytesOutCounter" : 0, "msgOutCounter" : 0, "msgRateRedeliver" : 0.0, "messageAckRate" : 0.0, "chunkedMessageRate" : 0.0, "consumerName" : "c2", "availablePermits" : 1000, "unackedMessages" : 0, "avgMessagesPerEntry" : 0, "blockedConsumerOnUnackedMsgs" : false, "drainingHashesCount" : 0, "drainingHashesClearedTotal" : 0, "drainingHashesUnackedMessages" : 0, "drainingHashes" : [ ], "address" : "/127.0.0.1:55829", "connectedSince" : "2024-10-10T05:39:39.294216+03:00", "clientVersion" : "Pulsar-Java-v4.0.0-SNAPSHOT", "lastAckedTimestamp" : 0, "lastConsumedTimestamp" : 0, "lastConsumedFlowTimestamp" : 1728527979297, "keyHashRangeArrays" : [ [ 1, 2959 ], [ 5969, 22257 ], [ 43034, 49260 ], [ 54465, 55154 ], [ 61274, 65535 ] ], "metadata" : { }, "lastAckedTime" : "1970-01-01T02:00:00+02:00", "lastConsumedTime" : "1970-01-01T02:00:00+02:00" } ] } ``` Relevant information for consumer c1: ```json { "drainingHashesCount" : 5, "drainingHashesClearedTotal" : 0, "drainingHashesUnackedMessages" : 10, "drainingHashes" : [ { "hash" : 2862, "unackMsgs" : 2, "blockedAttempts" : 5 }, { "hash" : 11707, "unackMsgs" : 2, "blockedAttempts" : 9 }, { "hash" : 15786, "unackMsgs" : 2, "blockedAttempts" : 6 }, { "hash" : 43539, "unackMsgs" : 2, "blockedAttempts" : 6 }, { "hash" : 45436, "unackMsgs" : 2, "blockedAttempts" : 9 } ], } ``` Relevant information in this case about consumer c2: ```json { "keyHashRangeArrays" : [ [ 1, 2959 ], [ 5969, 22257 ], [ 43034, 49260 ], [ 54465, 55154 ], [ 61274, 65535 ] ], } ``` The PIP-379 implementation will only block hashes that are necessary. For each hash, there's a way to get detailed information to find out why the delivery is blocked. The major difference from the previous `readPositionWhenJoining` solution is that it's possible to automate and build CLI and web user interface tools to assist a user, making it very easy to troubleshoot issues when message delivery is blocked by unacknowledged messages in Key_Shared subscriptions. Client-side tooling could already use the information provided in this PR to determine which consumer is blocked by a hash in the case that there would be multiple consumers. In the above example, the hash `2862` is contained in the hash range `[1, 2959]`, which means 2 unacknowledged messages for that hash are preventing further messages with hash `2862` from being delivered to consumer `c2`. The `blockedAttempts` field contains a counter that increments each time the dispatcher skips delivery to a consumer due to this hash. Using this information alone, it's very convenient to observe Key_Shared AUTO_SPLIT subscriptions and find out the causes. A future improvement will be to add a REST API for finding out the unacknowledged message ID information of the unacknowledged message for a hash. Using this information, it's possible to find out the details of the message that is blocking a particular hash. ### Documentation <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. --> - [x] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [ ] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
