This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit af09a3a075f04a63ee71828fbb2224bb5a9279f9
Author: fengyubiao <[email protected]>
AuthorDate: Thu Oct 16 00:01:00 2025 +0800

    [improve][broker] Part-2 of PIP-434: Use ServerCnxThrottleTracker, instead 
of modifying channel.readable directly (#24799)
    
    (cherry picked from commit acad78c10f0c0c4756d2dd73bf729bb2f84f53e7)
---
 .../resourcegroup/ResourceGroupPublishLimiter.java |   9 +-
 .../pulsar/broker/service/AbstractTopic.java       |   8 +-
 .../pulsar/broker/service/BrokerService.java       |   8 +-
 .../org/apache/pulsar/broker/service/Producer.java |  24 -
 .../broker/service/PublishRateLimiterImpl.java     |  13 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |  36 +-
 .../broker/service/ServerCnxThrottleTracker.java   | 481 +++++++++++++++++----
 .../apache/pulsar/broker/service/TransportCnx.java |  17 +-
 .../service/PublishRateLimiterDisableTest.java     |  23 +-
 .../broker/service/PublishRateLimiterTest.java     | 111 +++--
 .../service/TopicPublishRateThrottleTest.java      |  31 ++
 11 files changed, 565 insertions(+), 196 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java
index fc4514db81f..04f56e0ca69 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.resourcegroup;
 import org.apache.pulsar.broker.qos.MonotonicClock;
 import 
org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
 import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
+import org.apache.pulsar.broker.service.ServerCnxThrottleTracker;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.ResourceGroup;
@@ -30,7 +31,13 @@ public class ResourceGroupPublishLimiter extends 
PublishRateLimiterImpl  {
     private volatile long publishMaxByteRate;
 
     public ResourceGroupPublishLimiter(ResourceGroup resourceGroup, 
MonotonicClock monotonicClock) {
-        super(monotonicClock);
+        super(monotonicClock, producer -> {
+            producer.getCnx().getThrottleTracker().markThrottled(
+                
ServerCnxThrottleTracker.ThrottleType.ResourceGroupPublishRate);
+        }, producer -> {
+            producer.getCnx().getThrottleTracker().unmarkThrottled(
+                
ServerCnxThrottleTracker.ThrottleType.ResourceGroupPublishRate);
+        });
         update(resourceGroup);
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 520ece34f66..546a49952ec 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 import static 
org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
+import static 
org.apache.pulsar.broker.service.ServerCnxThrottleTracker.ThrottleType;
 import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
@@ -194,7 +195,12 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener {
         updateTopicPolicyByBrokerConfig();
 
         this.lastActive = System.nanoTime();
-        topicPublishRateLimiter = new 
PublishRateLimiterImpl(brokerService.getPulsar().getMonotonicClock());
+        topicPublishRateLimiter = new 
PublishRateLimiterImpl(brokerService.getPulsar().getMonotonicClock(),
+            producer -> {
+                
producer.getCnx().getThrottleTracker().markThrottled(ThrottleType.TopicPublishRate);
+            }, producer -> {
+                
producer.getCnx().getThrottleTracker().unmarkThrottled(ThrottleType.TopicPublishRate);
+            });
         updateActiveRateLimiters();
 
         additionalSystemCursorNames = 
brokerService.pulsar().getConfiguration().getAdditionalSystemCursorNames();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 8478e7f0510..8c4d1712b73 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -333,7 +333,13 @@ public class BrokerService implements Closeable {
         this.pulsar = pulsar;
         this.clock = pulsar.getClock();
         this.dynamicConfigurationMap = prepareDynamicConfigurationMap();
-        this.brokerPublishRateLimiter = new 
PublishRateLimiterImpl(pulsar.getMonotonicClock());
+        this.brokerPublishRateLimiter = new 
PublishRateLimiterImpl(pulsar.getMonotonicClock(), producer -> {
+            producer.getCnx().getThrottleTracker().markThrottled(
+                    ServerCnxThrottleTracker.ThrottleType.BrokerPublishRate);
+        }, producer -> {
+            producer.getCnx().getThrottleTracker().unmarkThrottled(
+                    ServerCnxThrottleTracker.ThrottleType.BrokerPublishRate);
+        });
         this.dispatchRateLimiterFactory = 
createDispatchRateLimiterFactory(pulsar.getConfig());
         this.managedLedgerStorage = pulsar.getManagedLedgerStorage();
         this.keepAliveIntervalSeconds = 
pulsar.getConfiguration().getKeepAliveIntervalSeconds();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 0784f74591e..9d0c1080254 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -899,30 +899,6 @@ public class Producer {
 
     private static final Logger log = LoggerFactory.getLogger(Producer.class);
 
-    /**
-     * This method increments a counter that is used to control the throttling 
of a connection.
-     * The connection's read operations are paused when the counter's value is 
greater than 0, indicating that
-     * throttling is in effect.
-     * It's important to note that after calling this method, it is the 
caller's responsibility to ensure that the
-     * counter is decremented by calling the {@link #decrementThrottleCount()} 
method when throttling is no longer
-     * needed on the connection.
-     */
-    public void incrementThrottleCount() {
-        cnx.incrementThrottleCount();
-    }
-
-    /**
-     * This method decrements a counter that is used to control the throttling 
of a connection.
-     * The connection's read operations are resumed when the counter's value 
is 0, indicating that
-     * throttling is no longer in effect.
-     * It's important to note that before calling this method, the caller 
should have previously
-     * incremented the counter by calling the {@link 
#incrementThrottleCount()} method when throttling
-     * was needed on the connection.
-     */
-    public void decrementThrottleCount() {
-        cnx.decrementThrottleCount();
-    }
-
     public Attributes getOpenTelemetryAttributes() {
         if (attributes != null) {
             return attributes;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
index 0015f2675a2..096418191dc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.qos.AsyncTokenBucket;
 import org.apache.pulsar.broker.qos.MonotonicClock;
@@ -42,9 +43,14 @@ public class PublishRateLimiterImpl implements 
PublishRateLimiter {
 
     private final AtomicInteger throttledProducersCount = new AtomicInteger(0);
     private final AtomicBoolean processingQueuedProducers = new 
AtomicBoolean(false);
+    private final Consumer<Producer> throttleAction;
+    private final Consumer<Producer> unthrottleAction;
 
-    public PublishRateLimiterImpl(MonotonicClock monotonicClock) {
+    public PublishRateLimiterImpl(MonotonicClock monotonicClock, 
Consumer<Producer> throttleAction,
+                                  Consumer<Producer> unthrottleAction) {
         this.monotonicClock = monotonicClock;
+        this.throttleAction = throttleAction;
+        this.unthrottleAction = unthrottleAction;
     }
 
     /**
@@ -68,7 +74,7 @@ public class PublishRateLimiterImpl implements 
PublishRateLimiter {
         }
         if (shouldThrottle) {
             // throttle the producer by incrementing the throttle count
-            producer.incrementThrottleCount();
+            throttleAction.accept(producer);
             // schedule decrementing the throttle count to possibly unthrottle 
the producer after the
             // throttling period
             scheduleDecrementThrottleCount(producer);
@@ -136,7 +142,8 @@ public class PublishRateLimiterImpl implements 
PublishRateLimiter {
             while ((throttlingDuration = calculateThrottlingDurationNanos()) 
== 0L
                     && (producer = unthrottlingQueue.poll()) != null) {
                 try {
-                    producer.decrementThrottleCount();
+                    final Producer producerFinal = producer;
+                    producer.getCnx().execute(() -> 
unthrottleAction.accept(producerFinal));
                 } catch (Exception e) {
                     log.error("Failed to unthrottle producer {}", producer, e);
                 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 1b38d91f888..aa6be2eaab0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -25,6 +25,7 @@ import static org.apache.commons.lang3.StringUtils.EMPTY;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static 
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync;
 import static org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync;
+import static 
org.apache.pulsar.broker.service.ServerCnxThrottleTracker.ThrottleType;
 import static 
org.apache.pulsar.broker.service.persistent.PersistentTopic.getMigratedClusterUrl;
 import static 
org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.ignoreUnrecoverableBKException;
 import static org.apache.pulsar.common.api.proto.ProtocolVersion.v5;
@@ -286,7 +287,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             if (maxPendingBytesPerThread > 0 && pendingBytes > 
maxPendingBytesPerThread
                     && !limitExceeded) {
                 limitExceeded = true;
-                cnxsPerThread.get().forEach(cnx -> 
cnx.throttleTracker.setPublishBufferLimiting(true));
+                cnxsPerThread.get().forEach(cnx -> 
cnx.throttleTracker.markThrottled(
+                        ThrottleType.IOThreadMaxPendingPublishBytesExceeded));
             }
         }
 
@@ -296,7 +298,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             // we resume all connections sharing the same thread
             if (limitExceeded && pendingBytes <= 
resumeThresholdPendingBytesPerThread) {
                 limitExceeded = false;
-                cnxsPerThread.get().forEach(cnx -> 
cnx.throttleTracker.setPublishBufferLimiting(false));
+                cnxsPerThread.get().forEach(cnx -> 
cnx.throttleTracker.unmarkThrottled(
+                        ThrottleType.IOThreadMaxPendingPublishBytesExceeded));
             }
         }
     }
@@ -314,6 +317,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         Start, Connected, Failed, Connecting
     }
 
+    @Getter
     private final ServerCnxThrottleTracker throttleTracker;
 
     public ServerCnx(PulsarService pulsar) {
@@ -482,12 +486,12 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             log.warn("[{}] Reached rate limitation", this);
             // Stop receiving requests.
             pausedDueToRateLimitation = true;
-            ctx.channel().config().setAutoRead(false);
+            
getThrottleTracker().markThrottled(ThrottleType.ConnectionPauseReceivingCooldownRateLimit);
             // Resume after 1 second.
             ctx.channel().eventLoop().schedule(() -> {
                 if (pausedDueToRateLimitation) {
                     log.info("[{}] Resuming connection after rate limitation", 
this);
-                    ctx.channel().config().setAutoRead(true);
+                    
getThrottleTracker().unmarkThrottled(ThrottleType.ConnectionPauseReceivingCooldownRateLimit);
                     pausedDueToRateLimitation = false;
                 }
             }, requestRateLimiter.getPeriodAtMs(), TimeUnit.MILLISECONDS);
@@ -498,7 +502,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws 
Exception {
         if (pauseReceivingRequestsIfUnwritable && ctx.channel().isWritable()) {
             log.info("[{}] is writable, turn on channel auto-read", this);
-            ctx.channel().config().setAutoRead(true);
+            
getThrottleTracker().unmarkThrottled(ThrottleType.ConnectionOutboundBufferFull);
             requestRateLimiter.timingOpen(pauseReceivingCooldownMilliSeconds, 
TimeUnit.MILLISECONDS);
         } else if (pauseReceivingRequestsIfUnwritable && 
!ctx.channel().isWritable()) {
             final ChannelOutboundBuffer outboundBuffer = 
ctx.channel().unsafe().outboundBuffer();
@@ -512,7 +516,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                     PAUSE_RECEIVING_LOG.debug("[{}] is not writable, turn off 
channel auto-read", this);
                 }
             }
-            ctx.channel().config().setAutoRead(false);
+            
getThrottleTracker().markThrottled(ThrottleType.ConnectionOutboundBufferFull);
         }
         ctx.fireChannelWritabilityChanged();
     }
@@ -3450,7 +3454,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     // or the pending publish bytes
     private void increasePendingSendRequestsAndPublishBytes(int msgSize) {
         if (++pendingSendRequest == maxPendingSendRequests) {
-            throttleTracker.setPendingSendRequestsExceeded(true);
+            
throttleTracker.markThrottled(ThrottleType.ConnectionMaxPendingPublishRequestsExceeded);
         }
         
PendingBytesPerThreadTracker.getInstance().incrementPublishBytes(msgSize, 
maxPendingBytesPerThread);
     }
@@ -3475,7 +3479,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         
PendingBytesPerThreadTracker.getInstance().decrementPublishBytes(msgSize, 
resumeThresholdPendingBytesPerThread);
 
         if (--pendingSendRequest == resumeReadsThreshold) {
-            throttleTracker.setPendingSendRequestsExceeded(false);
+            
throttleTracker.unmarkThrottled(ThrottleType.ConnectionMaxPendingPublishRequestsExceeded);
         }
 
         if (isNonPersistentTopic) {
@@ -3854,22 +3858,6 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         this.authRole = authRole;
     }
 
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void incrementThrottleCount() {
-        throttleTracker.incrementThrottleCount();
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void decrementThrottleCount() {
-        throttleTracker.decrementThrottleCount();
-    }
-
     @VisibleForTesting
     void setAuthState(AuthenticationState authState) {
         this.authState = authState;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
index 78bac024218..037612d2156 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
@@ -18,125 +18,442 @@
  */
 package org.apache.pulsar.broker.service;
 
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import com.google.common.annotations.VisibleForTesting;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.ServiceConfiguration;
 
 /**
- * Tracks the state of throttling for a connection. The throttling happens by 
pausing reads by setting
- * Netty {@link io.netty.channel.ChannelConfig#setAutoRead(boolean)} to false 
for the channel (connection).
- * <p>
- * There can be multiple rate limiters that can throttle a connection. Each 
rate limiter will independently
- * call the {@link #incrementThrottleCount()} and {@link 
#decrementThrottleCount()} methods to signal that the
- * connection should be throttled or not. The connection will be throttled if 
the counter is greater than 0.
- * <p>
- * Besides the rate limiters, the connection can also be throttled if the 
number of pending publish requests exceeds
- * a configured threshold. This throttling is toggled with the {@link 
#setPendingSendRequestsExceeded} method.
- * There's also per-thread memory limits which could throttle the connection. 
This throttling is toggled with the
- * {@link #setPublishBufferLimiting} method. Internally, these two methods 
will call the
- * {@link #incrementThrottleCount()} and {@link #decrementThrottleCount()} 
methods when the state changes.
+ * Manages and tracks throttling state for server connections in Apache Pulsar.
+ *
+ * <p>This class provides a centralized mechanism to control connection 
throttling by managing
+ * multiple throttling conditions simultaneously. When throttling is active, 
it pauses incoming
+ * requests by setting Netty's {@link 
io.netty.channel.ChannelConfig#setAutoRead(boolean)} to
+ * {@code false} for the associated channel.
+ *
+ * <h3>Throttling Mechanism</h3>
+ * <p>The tracker maintains independent counters for different types of 
throttling conditions
+ * defined in {@link ThrottleType}. A connection is considered throttled if 
any of these
+ * conditions are active (counter > 0). The connection will only resume normal 
operation
+ * when all throttling conditions are cleared.
+ *
+ * <h3>Supported Throttling Types</h3>
+ * <ul>
+ *   <li><b>Connection-level:</b> Max pending publish requests, outbound 
buffer limits</li>
+ *   <li><b>Thread-level:</b> IO thread memory limits for in-flight 
publishing</li>
+ *   <li><b>Topic-level:</b> Topic publish rate limiting</li>
+ *   <li><b>Resource Group-level:</b> Resource group publish rate limiting</li>
+ *   <li><b>Broker-level:</b> Global broker publish rate limiting</li>
+ *   <li><b>Flow Control:</b> Channel writability and cooldown rate 
limiting</li>
+ * </ul>
+ *
+ * <h3>Reentrant vs Non-Reentrant Types</h3>
+ * <p>Some throttling types support multiple concurrent activations 
(reentrant):
+ * <ul>
+ *   <li>{@link ThrottleType#TopicPublishRate} - Reentrant because multiple 
producers may share
+ *       the same rate limiter which relates to the same topic</li>
+ *   <li>{@link ThrottleType#ResourceGroupPublishRate} - Reentrant because 
multiple producers may share
+ *       the same rate limiter which relates to the same resource group</li>
+ * </ul>
+ * <p>Other types are non-reentrant and can only be activated once at a time. 
The reentrant types
+ * use counters to track how many producers are affected by the same shared 
rate limiter, while
+ * non-reentrant types use simple boolean states.
+ *
+ * <h3>Thread Safety</h3>
+ * <p>This class is designed to be used from a single thread (the connection's 
IO thread)
+ * and is not thread-safe for concurrent access from multiple threads.
+ *
+ * <h3>Usage Example</h3>
+ * <pre>{@code
+ * ServerCnxThrottleTracker tracker = new ServerCnxThrottleTracker(serverCnx);
+ *
+ * // Mark connection as throttled due to rate limiting
+ * tracker.markThrottled(ThrottleType.TopicPublishRate);
+ *
+ * // Later, when rate limiting condition is cleared
+ * tracker.unmarkThrottled(ThrottleType.TopicPublishRate);
+ * }</pre>
+ *
+ * @see ThrottleType
+ * @see ThrottleRes
+ * @see ServerCnx
  */
 @Slf4j
-final class ServerCnxThrottleTracker {
-
-    private static final AtomicIntegerFieldUpdater<ServerCnxThrottleTracker> 
THROTTLE_COUNT_UPDATER =
-            AtomicIntegerFieldUpdater.newUpdater(
-                    ServerCnxThrottleTracker.class, "throttleCount");
-
-    private static final AtomicIntegerFieldUpdater<ServerCnxThrottleTracker>
-            PENDING_SEND_REQUESTS_EXCEEDED_UPDATER =
-            AtomicIntegerFieldUpdater.newUpdater(
-                    ServerCnxThrottleTracker.class, 
"pendingSendRequestsExceeded");
-    private static final AtomicIntegerFieldUpdater<ServerCnxThrottleTracker> 
PUBLISH_BUFFER_LIMITING_UPDATER =
-            AtomicIntegerFieldUpdater.newUpdater(
-                    ServerCnxThrottleTracker.class, "publishBufferLimiting");
+public final class ServerCnxThrottleTracker {
+
     private final ServerCnx serverCnx;
-    private volatile int throttleCount;
-    private volatile int pendingSendRequestsExceeded;
-    private volatile int publishBufferLimiting;
+    private final int[] states = new int[ThrottleType.values().length];
 
+    /**
+     * Enumeration of different throttling conditions that can be applied to a 
server connection.
+     *
+     * <p>Each type represents a specific resource constraint or rate limiting 
condition
+     * that may require throttling the connection to maintain system stability 
and fairness.
+     *
+     * <p>Some types support reentrant behavior (can be activated multiple 
times concurrently),
+     * while others are non-reentrant (single activation only).
+     */
+    public enum ThrottleType {
 
-    public ServerCnxThrottleTracker(ServerCnx serverCnx) {
-        this.serverCnx = serverCnx;
+        /**
+         * Throttling due to excessive pending publish requests on the 
connection.
+         *
+         * <p>This throttling is activated when the number of in-flight 
publish requests
+         * exceeds the configured limit. It helps prevent memory exhaustion 
and ensures
+         * fair resource allocation across connections.
+         *
+         * <p><b>Type:</b> Non-reentrant
+         * <p><b>Configuration:</b> {@link 
ServiceConfiguration#getMaxPendingPublishRequestsPerConnection()}
+         */
+        ConnectionMaxPendingPublishRequestsExceeded(false),
+
+        /**
+         * Throttling due to excessive memory usage by in-flight publish 
operations on the IO thread.
+         *
+         * <p>This throttling is activated when the total memory used by 
pending publish operations
+         * on a shared IO thread exceeds the configured limit. Multiple 
connections may share the
+         * same IO thread, so this limit applies across all connections on 
that thread.
+         *
+         * <p><b>Type:</b> Non-reentrant
+         * <p><b>Configuration:</b> {@link 
ServiceConfiguration#getMaxMessagePublishBufferSizeInMB()}
+         */
+        IOThreadMaxPendingPublishBytesExceeded(false),
+
+        /**
+         * Throttling due to topic-level publish rate limiting.
+         *
+         * <p>This throttling is activated when publish operations exceed the 
configured
+         * rate limits for a specific topic. Multiple producers on the same 
topic may
+         * contribute to triggering this throttling condition.
+         *
+         * <p><b>Type:</b> Reentrant (supports multiple concurrent activations)
+         * <br><b>Reason for reentrancy:</b> Multiple producers may share the 
same rate limiter
+         * which relates to the same topic. Each producer can independently 
trigger throttling
+         * when the shared topic rate limiter becomes active, requiring a 
counter to track
+         * how many producers are affected by the same rate limiter.
+         *
+         * <p><b>Configuration:</b> Topic-level publish rate policies
+         */
+        TopicPublishRate(true),
+
+        /**
+         * Throttling due to resource group-level publish rate limiting.
+         *
+         * <p>This throttling is activated when publish operations exceed the 
configured
+         * rate limits for a resource group. Resource groups allow 
fine-grained control
+         * over resource allocation across multiple topics and tenants.
+         *
+         * <p><b>Type:</b> Reentrant (supports multiple concurrent activations)
+         * <br><b>Reason for reentrancy:</b> Multiple producers may share the 
same rate limiter
+         * which relates to the same resource group. Each producer can 
independently trigger
+         * throttling when the shared resource group rate limiter becomes 
active, requiring
+         * a counter to track how many producers are affected by the same rate 
limiter.
+         *
+         * <p><b>Configuration:</b> Resource group publish rate policies
+         */
+        ResourceGroupPublishRate(true),
+
+        /**
+         * Throttling due to broker-level publish rate limiting.
+         *
+         * <p>This throttling is activated when publish operations exceed the 
global
+         * broker-level rate limits. This provides a safety mechanism to 
prevent
+         * the entire broker from being overwhelmed by publish traffic.
+         *
+         * <p><b>Type:</b> Non-reentrant
+         * <p><b>Configuration:</b> {@link 
ServiceConfiguration#getBrokerPublisherThrottlingMaxMessageRate()}
+         * and {@link 
ServiceConfiguration#getBrokerPublisherThrottlingMaxByteRate()}
+         */
+        BrokerPublishRate(false),
 
+        /**
+         * Throttling due to channel outbound buffer being full.
+         *
+         * <p>This throttling is activated when the Netty channel's outbound 
buffer
+         * reaches its high water mark, indicating that the client cannot keep 
up
+         * with the rate of outgoing messages. This prevents memory exhaustion
+         * and provides backpressure to publishers.
+         *
+         * <p><b>Type:</b> Non-reentrant
+         * <p><b>Reference:</b> PIP-434: Expose Netty channel configuration 
WRITE_BUFFER_WATER_MARK
+         */
+        ConnectionOutboundBufferFull(false),
+
+        /**
+         * Throttling due to connection pause/resume cooldown rate limiting.
+         *
+         * <p>This throttling is activated during cooldown periods after a 
connection
+         * has been resumed from a throttled state. It prevents rapid 
oscillation
+         * between throttled and unthrottled states.
+         *
+         * <p><b>Type:</b> Non-reentrant
+         */
+        ConnectionPauseReceivingCooldownRateLimit(false);
+
+        @Getter
+        final boolean reentrant;
+
+        ThrottleType(boolean reentrant) {
+            this.reentrant = reentrant;
+        }
     }
 
     /**
-     * See {@link Producer#incrementThrottleCount()} for documentation.
+     * Enumeration representing the result of a throttling state change 
operation.
+     *
+     * <p>This enum indicates what happened when a throttling condition was 
marked or unmarked,
+     * helping callers understand whether the overall connection state changed 
or if the
+     * operation was ignored.
      */
-    public void incrementThrottleCount() {
-        int currentThrottleCount = 
THROTTLE_COUNT_UPDATER.incrementAndGet(this);
-        if (currentThrottleCount == 1) {
-            changeAutoRead(false);
+    enum ThrottleRes {
+        /**
+         * The operation resulted in a change to the overall connection 
throttling state.
+         *
+         * <p>This occurs when:
+         * <ul>
+         *   <li>The connection transitions from unthrottled to throttled 
(first throttle type activated)</li>
+         *   <li>The connection transitions from throttled to unthrottled 
(last throttle type deactivated)</li>
+         * </ul>
+         *
+         * <p>When this result is returned, the connection's auto-read setting 
will be updated
+         * accordingly to pause or resume request processing.
+         */
+        ConnectionStateChanged,
+
+        /**
+         * The operation changed the state of the specific throttle type but 
did not affect
+         * the overall connection throttling state.
+         *
+         * <p>This occurs when:
+         * <ul>
+         *   <li>A throttle type is activated, but the connection was already 
throttled by other types</li>
+         *   <li>A throttle type is deactivated, but the connection remains 
throttled by other types</li>
+         *   <li>A reentrant throttle type's counter is incremented or 
decremented</li>
+         * </ul>
+         */
+        TypeStateChanged,
+
+        /**
+         * The operation was dropped because it would violate the throttle 
type's constraints.
+         *
+         * <p>This occurs when:
+         * <ul>
+         *   <li>Attempting to mark a non-reentrant throttle type that is 
already active</li>
+         *   <li>Attempting to unmark a throttle type that is not currently 
active</li>
+         *   <li>Attempting to unmark a reentrant throttle type with an 
invalid counter state</li>
+         * </ul>
+         */
+        Dropped
+    }
+
+    /**
+     * Checks if the connection is currently throttled by any throttle type.
+     *
+     * <p>This method examines all throttle type states and returns {@code 
true}
+     * if any of them are active (counter > 0).
+     *
+     * @return {@code true} if any throttling condition is active, {@code 
false} otherwise
+     */
+    private boolean hasThrottled() {
+        for (int stat : states) {
+            if (stat > 0) {
+                return true;
+            }
         }
+        return false;
     }
 
     /**
-     * See {@link Producer#decrementThrottleCount()} for documentation.
+     * Returns the total count of active throttling conditions across all 
types.
+     *
+     * <p>This method sums up all the individual counters for each throttle 
type,
+     * providing a measure of the overall throttling pressure on the 
connection.
+     * For reentrant types, this includes the full counter value (not just 0 
or 1).
+     *
+     * @return the total number of active throttling conditions
      */
-    public void decrementThrottleCount() {
-        int currentThrottleCount = 
THROTTLE_COUNT_UPDATER.decrementAndGet(this);
-        if (currentThrottleCount == 0) {
-            changeAutoRead(true);
+    @VisibleForTesting
+    public int throttledCount() {
+        int i = 0;
+        for (int stat : states) {
+            i += stat;
         }
+        return i;
     }
 
-    private void changeAutoRead(boolean autoRead) {
-        if (isChannelActive()) {
+    /**
+     * Marks the connection as throttled for the specified throttle type.
+     *
+     * <p>This method activates throttling for the given type and may pause 
the connection's
+     * request processing if this is the first active throttling condition. 
For reentrant
+     * types ({@link ThrottleType#TopicPublishRate} and {@link 
ThrottleType#ResourceGroupPublishRate}),
+     * this increments the counter. For non-reentrant types, this sets the 
state to active.
+     *
+     * <p>If the connection transitions from unthrottled to throttled, this 
method will
+     * set the Netty channel's auto-read to {@code false}, effectively pausing 
incoming
+     * request processing.
+     *
+     * <p>Metrics are automatically recorded to track throttling events and 
connection state changes.
+     *
+     * @param type the type of throttling condition to activate
+     * @throws IllegalArgumentException if type is null
+     *
+     * @see #unmarkThrottled(ThrottleType)
+     * @see ThrottleType
+     */
+    public void markThrottled(ThrottleType type) {
+        assert serverCnx.ctx().executor().inEventLoop() : "This method should 
be called in serverCnx.ctx().executor()";
+        ThrottleRes res = doMarkThrottled(type);
+        recordMetricsAfterThrottling(type, res);
+        if (res == ThrottleRes.ConnectionStateChanged && isChannelActive()) {
             if (log.isDebugEnabled()) {
-                log.debug("[{}] Setting auto read to {}", 
serverCnx.toString(), autoRead);
+                log.debug("[{}] Setting auto read to false", 
serverCnx.toString());
             }
-            // change the auto read flag on the channel
-            serverCnx.ctx().channel().config().setAutoRead(autoRead);
-        }
-        // update the metrics that track throttling
-        if (autoRead) {
-            serverCnx.getBrokerService().recordConnectionResumed();
-        } else if (isChannelActive()) {
-            serverCnx.increasePublishLimitedTimesForTopics();
-            serverCnx.getBrokerService().recordConnectionPaused();
+            serverCnx.ctx().channel().config().setAutoRead(false);
         }
     }
 
-    private boolean isChannelActive() {
-        return serverCnx.isActive() && serverCnx.ctx() != null && 
serverCnx.ctx().channel().isActive();
+    /**
+     * Unmarks the connection as throttled for the specified throttle type.
+     *
+     * <p>This method deactivates throttling for the given type and may resume 
the connection's
+     * request processing if this was the last active throttling condition. 
For reentrant
+     * types ({@link ThrottleType#TopicPublishRate} and {@link 
ThrottleType#ResourceGroupPublishRate}),
+     * this decrements the counter. For non-reentrant types, this clears the 
active state.
+     *
+     * <p>If the connection transitions from throttled to unthrottled, this 
method will
+     * set the Netty channel's auto-read to {@code true}, effectively resuming 
incoming
+     * request processing.
+     *
+     * <p>Metrics are automatically recorded to track unthrottling events and 
connection state changes.
+     *
+     * @param type the type of throttling condition to deactivate
+     * @throws IllegalArgumentException if type is null
+     *
+     * @see #markThrottled(ThrottleType)
+     * @see ThrottleType
+     */
+    public void unmarkThrottled(ThrottleType type) {
+        assert serverCnx.ctx().executor().inEventLoop() : "This method should 
be called in serverCnx.ctx().executor()";
+        ThrottleRes res = doUnmarkThrottled(type);
+        recordMetricsAfterUnthrottling(type, res);
+        if (res == ThrottleRes.ConnectionStateChanged && isChannelActive()) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Setting auto read to true", 
serverCnx.toString());
+            }
+            serverCnx.ctx().channel().config().setAutoRead(true);
+        }
     }
 
-    public void setPublishBufferLimiting(boolean throttlingEnabled) {
-        changeThrottlingFlag(PUBLISH_BUFFER_LIMITING_UPDATER, 
throttlingEnabled);
+    /**
+     * Internal method to mark a throttle type as active without side effects.
+     *
+     * <p>This method updates the internal state for the specified throttle 
type
+     * and returns the result of the operation. It handles both reentrant and
+     * non-reentrant throttle types appropriately:
+     *
+     * <ul>
+     *   <li><b>Reentrant types:</b> Increment the counter</li>
+     *   <li><b>Non-reentrant types:</b> Set to active (1) if not already 
active</li>
+     * </ul>
+     *
+     * @param throttleType the type of throttling to mark as active
+     * @return the result of the marking operation
+     * @see ThrottleRes
+     */
+    private ThrottleRes doMarkThrottled(ThrottleType throttleType) {
+        // Two reentrant type: "TopicPublishRate" and 
"ResourceGroupPublishRate".
+        boolean throttled = hasThrottled();
+        int value = states[throttleType.ordinal()];
+        if (throttleType.isReentrant()) {
+            states[throttleType.ordinal()] = value + 1;
+        } else {
+            states[throttleType.ordinal()] = 1;
+            if (value != 0) {
+                return ThrottleRes.Dropped;
+            }
+        }
+        return throttled ? ThrottleRes.TypeStateChanged : 
ThrottleRes.ConnectionStateChanged;
     }
 
-    public void setPendingSendRequestsExceeded(boolean throttlingEnabled) {
-        boolean changed = 
changeThrottlingFlag(PENDING_SEND_REQUESTS_EXCEEDED_UPDATER, throttlingEnabled);
-        if (changed) {
-            // update the metrics that track throttling due to pending send 
requests
-            if (throttlingEnabled) {
-                serverCnx.getBrokerService().recordConnectionThrottled();
-            } else {
-                serverCnx.getBrokerService().recordConnectionUnthrottled();
+    /**
+     * Internal method to unmark a throttle type as active without side 
effects.
+     *
+     * <p>This method updates the internal state for the specified throttle 
type
+     * and returns the result of the operation. It handles both reentrant and
+     * non-reentrant throttle types appropriately:
+     *
+     * <ul>
+     *   <li><b>Reentrant types:</b> Decrement the counter</li>
+     *   <li><b>Non-reentrant types:</b> Clear active state if currently 
active</li>
+     * </ul>
+     *
+     * @param throttleType the type of throttling to mark as inactive
+     * @return the result of the unmarking operation
+     * @see ThrottleRes
+     */
+    private ThrottleRes doUnmarkThrottled(ThrottleType throttleType) {
+        int value = states[throttleType.ordinal()];
+        if (throttleType.isReentrant()) {
+            states[throttleType.ordinal()] = value - 1;
+        } else {
+            if (value != 1) {
+                return ThrottleRes.Dropped;
             }
+            states[throttleType.ordinal()] = 0;
+        }
+        return hasThrottled() ? ThrottleRes.TypeStateChanged : 
ThrottleRes.ConnectionStateChanged;
+    }
+
+    /**
+     * Records metrics after a throttling operation.
+     *
+     * <p>This method updates various broker metrics to track throttling 
events:
+     * <ul>
+     *   <li>Connection-specific throttling metrics for in-flight publishing 
limits</li>
+     *   <li>Connection pause metrics when the overall connection state 
changes</li>
+     *   <li>Topic-level publish limiting counters</li>
+     * </ul>
+     *
+     * @param type the throttle type that was activated
+     * @param res the result of the throttling operation
+     */
+    private void recordMetricsAfterThrottling(ThrottleType type, ThrottleRes 
res) {
+        if (type == ThrottleType.ConnectionMaxPendingPublishRequestsExceeded 
&& res != ThrottleRes.Dropped) {
+            serverCnx.getBrokerService().recordConnectionThrottled();
+        }
+        if (res == ThrottleRes.ConnectionStateChanged && isChannelActive()) {
+            serverCnx.increasePublishLimitedTimesForTopics();
+            serverCnx.getBrokerService().recordConnectionPaused();
         }
     }
 
-    private boolean 
changeThrottlingFlag(AtomicIntegerFieldUpdater<ServerCnxThrottleTracker> 
throttlingFlagFieldUpdater,
-                                         boolean throttlingEnabled) {
-        // don't change a throttling flag if the channel is not active
-        if (!isChannelActive()) {
-            return false;
+    /**
+     * Records metrics after an unthrottling operation.
+     *
+     * <p>This method updates various broker metrics to track unthrottling 
events:
+     * <ul>
+     *   <li>Connection-specific unthrottling metrics for in-flight publishing 
limits</li>
+     *   <li>Connection resume metrics when the overall connection state 
changes</li>
+     * </ul>
+     *
+     * @param type the throttle type that was deactivated
+     * @param res the result of the unthrottling operation
+     */
+    private void recordMetricsAfterUnthrottling(ThrottleType type, ThrottleRes 
res) {
+        if (type == ThrottleType.ConnectionMaxPendingPublishRequestsExceeded 
&& res != ThrottleRes.Dropped) {
+            serverCnx.getBrokerService().recordConnectionUnthrottled();
         }
-        if (throttlingFlagFieldUpdater.compareAndSet(this, 
booleanToInt(!throttlingEnabled),
-                booleanToInt(throttlingEnabled))) {
-            if (throttlingEnabled) {
-                incrementThrottleCount();
-            } else {
-                decrementThrottleCount();
-            }
-            return true;
-        } else {
-            return false;
+        if (res == ThrottleRes.ConnectionStateChanged && isChannelActive()) {
+            serverCnx.getBrokerService().recordConnectionResumed();
         }
     }
 
-    private static int booleanToInt(boolean value) {
-        return value ? 1 : 0;
+    public ServerCnxThrottleTracker(ServerCnx serverCnx) {
+        this.serverCnx = serverCnx;
+    }
+
+    private boolean isChannelActive() {
+        return serverCnx.isActive() && serverCnx.ctx() != null && 
serverCnx.ctx().channel().isActive();
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
index 63599f09eef..2c0b247a94b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
@@ -88,22 +88,9 @@ public interface TransportCnx {
     CompletableFuture<Optional<Boolean>> checkConnectionLiveness();
 
     /**
-     * Increments the counter that controls the throttling of the connection 
by pausing reads.
-     * The connection will be throttled while the counter is greater than 0.
-     * <p>
-     * The caller is responsible for decrementing the counter by calling 
{@link #decrementThrottleCount()}  when the
-     * connection should no longer be throttled.
+     * Get the throttle tracker for this connection.
      */
-    void incrementThrottleCount();
-
-    /**
-     * Decrements the counter that controls the throttling of the connection 
by pausing reads.
-     * The connection will be throttled while the counter is greater than 0.
-     * <p>
-     * This method should be called when the connection should no longer be 
throttled. However, the caller should have
-     * previously called {@link #incrementThrottleCount()}.
-     */
-    void decrementThrottleCount();
+    ServerCnxThrottleTracker getThrottleTracker();
 
     FeatureFlags getFeatures();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterDisableTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterDisableTest.java
index ec952a7ca77..3e6edb04932 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterDisableTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterDisableTest.java
@@ -18,10 +18,11 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 import org.apache.pulsar.broker.qos.AsyncTokenBucket;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class PublishRateLimiterDisableTest {
@@ -29,9 +30,23 @@ public class PublishRateLimiterDisableTest {
     // GH issue #10603
     @Test
     void shouldAlwaysAllowAcquire() {
-        PublishRateLimiter publishRateLimiter = new 
PublishRateLimiterImpl(AsyncTokenBucket.DEFAULT_SNAPSHOT_CLOCK);
+        PublishRateLimiter publishRateLimiter = new 
PublishRateLimiterImpl(AsyncTokenBucket.DEFAULT_SNAPSHOT_CLOCK,
+    producer -> {
+                producer.getCnx().getThrottleTracker().markThrottled(
+                        
ServerCnxThrottleTracker.ThrottleType.BrokerPublishRate);
+            }, producer -> {
+                producer.getCnx().getThrottleTracker().unmarkThrottled(
+                        
ServerCnxThrottleTracker.ThrottleType.BrokerPublishRate);
+            });
         Producer producer = mock(Producer.class);
+        ServerCnx serverCnx = mock(ServerCnx.class);
+        doAnswer(a -> serverCnx).when(producer).getCnx();
+        ServerCnxThrottleTracker throttleTracker = new 
ServerCnxThrottleTracker(serverCnx);
+        doAnswer(a -> throttleTracker).when(serverCnx).getThrottleTracker();
+        when(producer.getCnx()).thenReturn(serverCnx);
+        BrokerService brokerService = mock(BrokerService.class);
+        when(serverCnx.getBrokerService()).thenReturn(brokerService);
         publishRateLimiter.handlePublishThrottling(producer, 
Integer.MAX_VALUE, Long.MAX_VALUE);
-        verify(producer, never()).incrementThrottleCount();
+        Assert.assertEquals(throttleTracker.throttledCount(), 0);
     }
 }
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
index 20c1ad0a412..573e3980c73 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
@@ -19,18 +19,18 @@
 
 package org.apache.pulsar.broker.service;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.DefaultEventLoop;
 import io.netty.channel.EventLoop;
 import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.HashMap;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PublishRate;
@@ -47,36 +47,40 @@ public class PublishRateLimiterTest {
     private AtomicLong manualClockSource;
 
     private Producer producer;
+    private ServerCnx serverCnx;
     private PublishRateLimiterImpl publishRateLimiter;
-
-    private AtomicInteger throttleCount = new AtomicInteger(0);
+    private ServerCnxThrottleTracker throttleTracker;
+    private DefaultThreadFactory threadFactory = new 
DefaultThreadFactory("pulsar-io");
+    private EventLoop eventLoop = new DefaultEventLoop(threadFactory);
 
     @BeforeMethod
     public void setup() throws Exception {
         policies.publishMaxMessageRate = new HashMap<>();
         policies.publishMaxMessageRate.put(CLUSTER_NAME, publishRate);
         manualClockSource = new AtomicLong(TimeUnit.SECONDS.toNanos(100));
-        publishRateLimiter = new PublishRateLimiterImpl(() -> 
manualClockSource.get());
+        publishRateLimiter = new PublishRateLimiterImpl(() -> 
manualClockSource.get(),
+                producer -> {
+                    producer.getCnx().getThrottleTracker().markThrottled(
+                            
ServerCnxThrottleTracker.ThrottleType.TopicPublishRate);
+                }, producer -> {
+            producer.getCnx().getThrottleTracker().unmarkThrottled(
+                    ServerCnxThrottleTracker.ThrottleType.TopicPublishRate);
+        });
         publishRateLimiter.update(policies, CLUSTER_NAME);
         producer = mock(Producer.class);
-        throttleCount.set(0);
-        doAnswer(a -> {
-            throttleCount.incrementAndGet();
-            return null;
-        }).when(producer).incrementThrottleCount();
-        doAnswer(a -> {
-            throttleCount.decrementAndGet();
-            return null;
-        }).when(producer).decrementThrottleCount();
-        TransportCnx transportCnx = mock(TransportCnx.class);
-        when(producer.getCnx()).thenReturn(transportCnx);
+        serverCnx = mock(ServerCnx.class);
+        ChannelHandlerContext channelHandlerContext = 
mock(ChannelHandlerContext.class);
+        doAnswer(a -> eventLoop).when(channelHandlerContext).executor();
+        doAnswer(a -> channelHandlerContext).when(serverCnx).ctx();
+        doAnswer(a -> this.serverCnx).when(producer).getCnx();
+        throttleTracker = new ServerCnxThrottleTracker(this.serverCnx);
+        doAnswer(a -> 
throttleTracker).when(this.serverCnx).getThrottleTracker();
+        when(producer.getCnx()).thenReturn(serverCnx);
         BrokerService brokerService = mock(BrokerService.class);
-        when(transportCnx.getBrokerService()).thenReturn(brokerService);
+        when(serverCnx.getBrokerService()).thenReturn(brokerService);
         EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
         when(brokerService.executor()).thenReturn(eventLoopGroup);
-        EventLoop eventLoop = mock(EventLoop.class);
         when(eventLoopGroup.next()).thenReturn(eventLoop);
-        doReturn(null).when(eventLoop).schedule(any(Runnable.class), 
anyLong(), any());
         incrementSeconds(1);
     }
 
@@ -86,38 +90,63 @@ public class PublishRateLimiterTest {
         policies.publishMaxMessageRate = null;
     }
 
+    @AfterMethod
+    public void tearDown() throws Exception {
+        eventLoop.shutdownGracefully();
+    }
+
     private void incrementSeconds(int seconds) {
         manualClockSource.addAndGet(TimeUnit.SECONDS.toNanos(seconds));
     }
 
     @Test
     public void testPublishRateLimiterImplExceed() throws Exception {
-        // increment not exceed
-        publishRateLimiter.handlePublishThrottling(producer, 5, 50);
-        assertEquals(throttleCount.get(), 0);
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        eventLoop.execute(() -> {
+            try {
+                // increment not exceed
+                publishRateLimiter.handlePublishThrottling(producer, 5, 50);
+                assertEquals(throttleTracker.throttledCount(), 0);
 
-        incrementSeconds(1);
+                incrementSeconds(1);
 
-        // numOfMessages increment exceeded
-        publishRateLimiter.handlePublishThrottling(producer, 11, 100);
-        assertEquals(throttleCount.get(), 1);
+                // numOfMessages increment exceeded
+                publishRateLimiter.handlePublishThrottling(producer, 11, 100);
+                assertEquals(throttleTracker.throttledCount(), 1);
 
-        incrementSeconds(1);
+                incrementSeconds(1);
+
+                // msgSizeInBytes increment exceeded
+                publishRateLimiter.handlePublishThrottling(producer, 9, 110);
+                assertEquals(throttleTracker.throttledCount(), 2);
 
-        // msgSizeInBytes increment exceeded
-        publishRateLimiter.handlePublishThrottling(producer, 9, 110);
-        assertEquals(throttleCount.get(), 2);
+                future.complete(null);
+            } catch (Exception e) {
+                future.completeExceptionally(e);
+            }
+        });
+        future.get(5, TimeUnit.SECONDS);
     }
 
     @Test
-    public void testPublishRateLimiterImplUpdate() {
-        publishRateLimiter.handlePublishThrottling(producer, 11, 110);
-        assertEquals(throttleCount.get(), 1);
-
-        // update
-        throttleCount.set(0);
-        publishRateLimiter.update(newPublishRate);
-        publishRateLimiter.handlePublishThrottling(producer, 11, 110);
-        assertEquals(throttleCount.get(), 0);
+    public void testPublishRateLimiterImplUpdate() throws Exception {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        eventLoop.execute(() -> {
+            try {
+                publishRateLimiter.handlePublishThrottling(producer, 11, 110);
+                assertEquals(throttleTracker.throttledCount(), 1);
+
+                // update
+                throttleTracker = new ServerCnxThrottleTracker(serverCnx);
+                publishRateLimiter.update(newPublishRate);
+                publishRateLimiter.handlePublishThrottling(producer, 11, 110);
+                assertEquals(throttleTracker.throttledCount(), 0);
+
+                future.complete(null);
+            } catch (Exception e) {
+                future.completeExceptionally(e);
+            }
+        });
+        future.get(5, TimeUnit.SECONDS);
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java
index 40bcb19ab0c..929350b599e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java
@@ -73,6 +73,37 @@ public class TopicPublishRateThrottleTest extends 
BrokerTestBase{
         pulsarClient.close();
     }
 
+    @Test
+    public void testResumeEvenProducerClosed() throws Exception {
+        PublishRate publishRate = new PublishRate(1, 10);
+        conf.setMaxPendingPublishRequestsPerConnection(0);
+        super.baseSetup();
+        admin.namespaces().setPublishRate("prop/ns-abc", publishRate);
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp");
+        org.apache.pulsar.client.api.Producer<byte[]> producer = 
pulsarClient.newProducer()
+                .topic(topic).create();
+
+        Topic topicRef = 
pulsar.getBrokerService().getTopicReference(topic).get();
+        Assert.assertNotNull(topicRef);
+        MessageId messageId = null;
+        // first will be success, and the second will fail, will set auto read 
to false.
+        messageId = producer.sendAsync(new byte[10]).get(500, 
TimeUnit.MILLISECONDS);
+        Assert.assertNotNull(messageId);
+        // second will be blocked
+        producer.sendAsync(new byte[10]);
+
+        // Verify: even through the producer was closed before the unblock, 
the state should be unblocked at the next
+        // period of rate limiter.
+        producer.close();
+        Thread.sleep(3000);
+        org.apache.pulsar.client.api.Producer<byte[]> producer2 = 
pulsarClient.newProducer()
+                .topic(topic).create();
+        producer2.sendAsync(new byte[2]).get(500, TimeUnit.MILLISECONDS);
+
+        // Close the PulsarClient gracefully to avoid ByteBuf leak
+        pulsarClient.close();
+    }
+
     @Test
     public void testSystemTopicPublishNonBlock() throws Exception {
         super.baseSetup();

Reply via email to