lhotari commented on code in PR #24799:
URL: https://github.com/apache/pulsar/pull/24799#discussion_r2428091890


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java:
##########
@@ -18,125 +18,442 @@
  */
 package org.apache.pulsar.broker.service;
 
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import com.google.common.annotations.VisibleForTesting;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.ServiceConfiguration;
 
 /**
- * Tracks the state of throttling for a connection. The throttling happens by 
pausing reads by setting
- * Netty {@link io.netty.channel.ChannelConfig#setAutoRead(boolean)} to false 
for the channel (connection).
- * <p>
- * There can be multiple rate limiters that can throttle a connection. Each 
rate limiter will independently
- * call the {@link #incrementThrottleCount()} and {@link 
#decrementThrottleCount()} methods to signal that the
- * connection should be throttled or not. The connection will be throttled if 
the counter is greater than 0.
- * <p>
- * Besides the rate limiters, the connection can also be throttled if the 
number of pending publish requests exceeds
- * a configured threshold. This throttling is toggled with the {@link 
#setPendingSendRequestsExceeded} method.
- * There's also per-thread memory limits which could throttle the connection. 
This throttling is toggled with the
- * {@link #setPublishBufferLimiting} method. Internally, these two methods 
will call the
- * {@link #incrementThrottleCount()} and {@link #decrementThrottleCount()} 
methods when the state changes.
+ * Manages and tracks throttling state for server connections in Apache Pulsar.
+ *
+ * <p>This class provides a centralized mechanism to control connection 
throttling by managing
+ * multiple throttling conditions simultaneously. When throttling is active, 
it pauses incoming
+ * requests by setting Netty's {@link 
io.netty.channel.ChannelConfig#setAutoRead(boolean)} to
+ * {@code false} for the associated channel.
+ *
+ * <h3>Throttling Mechanism</h3>
+ * <p>The tracker maintains independent counters for different types of 
throttling conditions
+ * defined in {@link ThrottleType}. A connection is considered throttled if 
any of these
+ * conditions are active (counter > 0). The connection will only resume normal 
operation
+ * when all throttling conditions are cleared.
+ *
+ * <h3>Supported Throttling Types</h3>
+ * <ul>
+ *   <li><b>Connection-level:</b> Max pending publish requests, outbound 
buffer limits</li>
+ *   <li><b>Thread-level:</b> IO thread memory limits for in-flight 
publishing</li>
+ *   <li><b>Topic-level:</b> Topic publish rate limiting</li>
+ *   <li><b>Resource Group-level:</b> Resource group publish rate limiting</li>
+ *   <li><b>Broker-level:</b> Global broker publish rate limiting</li>
+ *   <li><b>Flow Control:</b> Channel writability and cooldown rate 
limiting</li>
+ * </ul>
+ *
+ * <h3>Reentrant vs Non-Reentrant Types</h3>
+ * <p>Some throttling types support multiple concurrent activations 
(reentrant):
+ * <ul>
+ *   <li>{@link ThrottleType#TopicPublishRate} - Reentrant because multiple 
producers may share
+ *       the same rate limiter which relates to the same topic</li>
+ *   <li>{@link ThrottleType#ResourceGroupPublishRate} - Reentrant because 
multiple producers may share
+ *       the same rate limiter which relates to the same resource group</li>
+ * </ul>
+ * <p>Other types are non-reentrant and can only be activated once at a time. 
The reentrant types
+ * use counters to track how many producers are affected by the same shared 
rate limiter, while
+ * non-reentrant types use simple boolean states.
+ *
+ * <h3>Thread Safety</h3>
+ * <p>This class is designed to be used from a single thread (the connection's 
IO thread)
+ * and is not thread-safe for concurrent access from multiple threads.
+ *
+ * <h3>Usage Example</h3>
+ * <pre>{@code
+ * ServerCnxThrottleTracker tracker = new ServerCnxThrottleTracker(serverCnx);
+ *
+ * // Mark connection as throttled due to rate limiting
+ * tracker.markThrottled(ThrottleType.TopicPublishRate);
+ *
+ * // Later, when rate limiting condition is cleared
+ * tracker.unmarkThrottled(ThrottleType.TopicPublishRate);
+ * }</pre>
+ *
+ * @see ThrottleType
+ * @see ThrottleRes
+ * @see ServerCnx
  */
 @Slf4j
-final class ServerCnxThrottleTracker {
-
-    private static final AtomicIntegerFieldUpdater<ServerCnxThrottleTracker> 
THROTTLE_COUNT_UPDATER =
-            AtomicIntegerFieldUpdater.newUpdater(
-                    ServerCnxThrottleTracker.class, "throttleCount");
-
-    private static final AtomicIntegerFieldUpdater<ServerCnxThrottleTracker>
-            PENDING_SEND_REQUESTS_EXCEEDED_UPDATER =
-            AtomicIntegerFieldUpdater.newUpdater(
-                    ServerCnxThrottleTracker.class, 
"pendingSendRequestsExceeded");
-    private static final AtomicIntegerFieldUpdater<ServerCnxThrottleTracker> 
PUBLISH_BUFFER_LIMITING_UPDATER =
-            AtomicIntegerFieldUpdater.newUpdater(
-                    ServerCnxThrottleTracker.class, "publishBufferLimiting");
+public final class ServerCnxThrottleTracker {
+
     private final ServerCnx serverCnx;
-    private volatile int throttleCount;
-    private volatile int pendingSendRequestsExceeded;
-    private volatile int publishBufferLimiting;
+    private final int[] states = new int[ThrottleType.values().length];
 
+    /**
+     * Enumeration of different throttling conditions that can be applied to a 
server connection.
+     *
+     * <p>Each type represents a specific resource constraint or rate limiting 
condition
+     * that may require throttling the connection to maintain system stability 
and fairness.
+     *
+     * <p>Some types support reentrant behavior (can be activated multiple 
times concurrently),
+     * while others are non-reentrant (single activation only).
+     */
+    public static enum ThrottleType {
 
-    public ServerCnxThrottleTracker(ServerCnx serverCnx) {
-        this.serverCnx = serverCnx;
+        /**
+         * Throttling due to excessive pending publish requests on the 
connection.
+         *
+         * <p>This throttling is activated when the number of in-flight 
publish requests
+         * exceeds the configured limit. It helps prevent memory exhaustion 
and ensures
+         * fair resource allocation across connections.
+         *
+         * <p><b>Type:</b> Non-reentrant
+         * <p><b>Configuration:</b> {@link 
ServiceConfiguration#getMaxPendingPublishRequestsPerConnection()}
+         */
+        ConnectionMaxQuantityOfInFlightPublishing(false),
+
+        /**
+         * Throttling due to excessive memory usage by in-flight publish 
operations on the IO thread.
+         *
+         * <p>This throttling is activated when the total memory used by 
pending publish operations
+         * on a shared IO thread exceeds the configured limit. Multiple 
connections may share the
+         * same IO thread, so this limit applies across all connections on 
that thread.
+         *
+         * <p><b>Type:</b> Non-reentrant
+         * <p><b>Configuration:</b> {@link 
ServiceConfiguration#getMaxMessagePublishBufferSizeInMB()}
+         */
+        IOThreadMaxBytesOfInFlightPublishing(false),

Review Comment:
   The name `IOThreadMaxBytesOfInFlightPublishing` isn't great. It's better to 
more closely aligned with the configuration setting 
`maxMessagePublishBufferSizeInMB`. 
   For example `IOThreadMaxPendingPublishBytesExceeded`. This would also be 
aligned with the possible new name 
`ConnectionMaxPendingPublishRequestsExceeded`.
   (could also omit `Exceeded` suffixes)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to