shibd commented on code in PR #23945:
URL: https://github.com/apache/pulsar/pull/23945#discussion_r1957474468


##########
pip/pip-406.md:
##########
@@ -0,0 +1,75 @@
+# PIP-406: Introduce pulsar_subscription_dispatch_throttled_msgs and bytes 
metrics 
+
+# Background knowledge
+
+## Motivation
+
+Currently, users can monitor subscription backlogs using the 
`pulsar_subscription_back_log_no_delayed` metric. 
+However, if [dispatch 
throttling](https://pulsar.apache.org/docs/next/concepts-throttling/) is 
configured at the broker/topic/subscription level,
+this metric may not accurately reflect whether the backlog is due to 
insufficient consumer capacity, as it could be caused by dispatch throttling.
+
+## Goals
+
+Introduce metrics to indicate the number of `messages/bytes throttled` for a 
subscription. This allows users to write PromQL queries to identify 
subscriptions with high backlogs but low or no throttling, pinpointing backlogs 
caused by insufficient consumer capacity.
+
+## In Scope
+- Introduce the metric `pulsar_subscription_dispatch_throttled_msgs` to 
represent the total number of messages throttled for a subscription.
+- Introduce the metric `pulsar_subscription_dispatch_throttled_bytes` to 
represent the total number of bytes throttled for a subscription.
+- Add `dispatchThrottledMsgs` and `dispatchThrottledBytes` fields to topic 
subscription stats.
+
+## Out of Scope
+- These states are not persistent and will reset upon subscription 
reconnection.
+
+# High Level Design
+1. Maintain `dispatchThrottledMsgs` and `dispatchThrottledBytes` in 
`AbstractBaseDispatcher`. Increase these values whenever the number of 
messages/bytes is reduced during `calculateToRead`.
+2. Output these fields when retrieving topic stats and metrics.
+
+
+# Detailed Design
+
+## Design & Implementation Details
+1. Maintain `dispatchThrottledMsgs` and `dispatchThrottledBytes` in 
`AbstractBaseDispatcher`:
+```java
+    private final LongAdder dispatchThrottledMsgs = new LongAdder();
+    private final AtomicLong dispatchThrottledBytes = new AtomicLong();
+```
+
+2. During each 
[calculateToRead](https://github.com/apache/pulsar/blob/411f6973e85b0a6213e992386e1704f93d0aae42/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java#L371-L377),
+if the number of `messages/bytes` is reduced, increase these fields 
accordingly.
+
+- dispatchThrottledBytes may overflow in extreme cases, so reset this value 
before overflow:
+```diff
+     protected Pair<Integer, Long> updateMessagesToRead(DispatchRateLimiter 
dispatchRateLimiter,
+                                                        int messagesToRead, 
long bytesToRead) {
+         // update messagesToRead according to available dispatch rate limit.
+-        return computeReadLimits(messagesToRead,
++        Pair<Integer, Long> result = computeReadLimits(messagesToRead,
+                 (int) 
dispatchRateLimiter.getAvailableDispatchRateLimitOnMsg(),
+                 bytesToRead, 
dispatchRateLimiter.getAvailableDispatchRateLimitOnByte());
++        if (result.getLeft() < messagesToRead) {
++            dispatchThrottledMsgs.add(messagesToRead - result.getLeft());
++        }
++        if (result.getRight() < bytesToRead) {
++            long increment = bytesToRead - result.getRight();
++            dispatchThrottledBytes.updateAndGet(current -> {
++                // Check if adding the increment would cause an overflow
++                if (Long.MAX_VALUE - current < increment) {
++                    return increment;
++                }
++                return current + increment;
++            });
++        }
++        return result;
+     }
+```
+
+## Public-facing Changes
+- None

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to