RobertIndie commented on code in PR #23945: URL: https://github.com/apache/pulsar/pull/23945#discussion_r1952376734
########## pip/pip-406.md: ########## @@ -0,0 +1,75 @@ +# PIP-406: Introduce pulsar_subscription_dispatch_throttled_msgs and bytes metrics + +# Background knowledge + +## Motivation + +Currently, users can monitor subscription backlogs using the `pulsar_subscription_back_log_no_delayed` metric. +However, if [dispatch throttling](https://pulsar.apache.org/docs/next/concepts-throttling/) is configured at the broker/topic/subscription level, +this metric may not accurately reflect whether the backlog is due to insufficient consumer capacity, as it could be caused by dispatch throttling. + +## Goals + +Introduce metrics to indicate the number of `messages/bytes throttled` for a subscription. This allows users to write PromQL queries to identify subscriptions with high backlogs but low or no throttling, pinpointing backlogs caused by insufficient consumer capacity. + +## In Scope +- Introduce the metric `pulsar_subscription_dispatch_throttled_msgs` to represent the total number of messages throttled for a subscription. +- Introduce the metric `pulsar_subscription_dispatch_throttled_bytes` to represent the total number of bytes throttled for a subscription. +- Add `dispatchThrottledMsgs` and `dispatchThrottledBytes` fields to topic subscription stats. + +## Out of Scope +- These states are not persistent and will reset upon subscription reconnection. + +# High Level Design +1. Maintain `dispatchThrottledMsgs` and `dispatchThrottledBytes` in `AbstractBaseDispatcher`. Increase these values whenever the number of messages/bytes is reduced during `calculateToRead`. +2. Output these fields when retrieving topic stats and metrics. + + +# Detailed Design + +## Design & Implementation Details +1. Maintain `dispatchThrottledMsgs` and `dispatchThrottledBytes` in `AbstractBaseDispatcher`: +```java + private final LongAdder dispatchThrottledMsgs = new LongAdder(); + private final AtomicLong dispatchThrottledBytes = new AtomicLong(); +``` + +2. During each [calculateToRead](https://github.com/apache/pulsar/blob/411f6973e85b0a6213e992386e1704f93d0aae42/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java#L371-L377), +if the number of `messages/bytes` is reduced, increase these fields accordingly. + +- dispatchThrottledBytes may overflow in extreme cases, so reset this value before overflow: +```diff + protected Pair<Integer, Long> updateMessagesToRead(DispatchRateLimiter dispatchRateLimiter, + int messagesToRead, long bytesToRead) { + // update messagesToRead according to available dispatch rate limit. +- return computeReadLimits(messagesToRead, ++ Pair<Integer, Long> result = computeReadLimits(messagesToRead, + (int) dispatchRateLimiter.getAvailableDispatchRateLimitOnMsg(), + bytesToRead, dispatchRateLimiter.getAvailableDispatchRateLimitOnByte()); ++ if (result.getLeft() < messagesToRead) { ++ dispatchThrottledMsgs.add(messagesToRead - result.getLeft()); ++ } ++ if (result.getRight() < bytesToRead) { ++ long increment = bytesToRead - result.getRight(); ++ dispatchThrottledBytes.updateAndGet(current -> { ++ // Check if adding the increment would cause an overflow ++ if (Long.MAX_VALUE - current < increment) { ++ return increment; ++ } ++ return current + increment; ++ }); ++ } ++ return result; + } +``` + +## Public-facing Changes +- None Review Comment: Two new metrics have been added in this PIP. We should document them here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
