This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit af09a3a075f04a63ee71828fbb2224bb5a9279f9 Author: fengyubiao <[email protected]> AuthorDate: Thu Oct 16 00:01:00 2025 +0800 [improve][broker] Part-2 of PIP-434: Use ServerCnxThrottleTracker, instead of modifying channel.readable directly (#24799) (cherry picked from commit acad78c10f0c0c4756d2dd73bf729bb2f84f53e7) --- .../resourcegroup/ResourceGroupPublishLimiter.java | 9 +- .../pulsar/broker/service/AbstractTopic.java | 8 +- .../pulsar/broker/service/BrokerService.java | 8 +- .../org/apache/pulsar/broker/service/Producer.java | 24 - .../broker/service/PublishRateLimiterImpl.java | 13 +- .../apache/pulsar/broker/service/ServerCnx.java | 36 +- .../broker/service/ServerCnxThrottleTracker.java | 481 +++++++++++++++++---- .../apache/pulsar/broker/service/TransportCnx.java | 17 +- .../service/PublishRateLimiterDisableTest.java | 23 +- .../broker/service/PublishRateLimiterTest.java | 111 +++-- .../service/TopicPublishRateThrottleTest.java | 31 ++ 11 files changed, 565 insertions(+), 196 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java index fc4514db81f..04f56e0ca69 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java @@ -21,6 +21,7 @@ package org.apache.pulsar.broker.resourcegroup; import org.apache.pulsar.broker.qos.MonotonicClock; import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount; import org.apache.pulsar.broker.service.PublishRateLimiterImpl; +import org.apache.pulsar.broker.service.ServerCnxThrottleTracker; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.ResourceGroup; @@ -30,7 +31,13 @@ public class ResourceGroupPublishLimiter extends PublishRateLimiterImpl { private volatile long publishMaxByteRate; public ResourceGroupPublishLimiter(ResourceGroup resourceGroup, MonotonicClock monotonicClock) { - super(monotonicClock); + super(monotonicClock, producer -> { + producer.getCnx().getThrottleTracker().markThrottled( + ServerCnxThrottleTracker.ThrottleType.ResourceGroupPublishRate); + }, producer -> { + producer.getCnx().getThrottleTracker().unmarkThrottled( + ServerCnxThrottleTracker.ThrottleType.ResourceGroupPublishRate); + }); update(resourceGroup); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 520ece34f66..546a49952ec 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service; import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC; +import static org.apache.pulsar.broker.service.ServerCnxThrottleTracker.ThrottleType; import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; @@ -194,7 +195,12 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener { updateTopicPolicyByBrokerConfig(); this.lastActive = System.nanoTime(); - topicPublishRateLimiter = new PublishRateLimiterImpl(brokerService.getPulsar().getMonotonicClock()); + topicPublishRateLimiter = new PublishRateLimiterImpl(brokerService.getPulsar().getMonotonicClock(), + producer -> { + producer.getCnx().getThrottleTracker().markThrottled(ThrottleType.TopicPublishRate); + }, producer -> { + producer.getCnx().getThrottleTracker().unmarkThrottled(ThrottleType.TopicPublishRate); + }); updateActiveRateLimiters(); additionalSystemCursorNames = brokerService.pulsar().getConfiguration().getAdditionalSystemCursorNames(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 8478e7f0510..8c4d1712b73 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -333,7 +333,13 @@ public class BrokerService implements Closeable { this.pulsar = pulsar; this.clock = pulsar.getClock(); this.dynamicConfigurationMap = prepareDynamicConfigurationMap(); - this.brokerPublishRateLimiter = new PublishRateLimiterImpl(pulsar.getMonotonicClock()); + this.brokerPublishRateLimiter = new PublishRateLimiterImpl(pulsar.getMonotonicClock(), producer -> { + producer.getCnx().getThrottleTracker().markThrottled( + ServerCnxThrottleTracker.ThrottleType.BrokerPublishRate); + }, producer -> { + producer.getCnx().getThrottleTracker().unmarkThrottled( + ServerCnxThrottleTracker.ThrottleType.BrokerPublishRate); + }); this.dispatchRateLimiterFactory = createDispatchRateLimiterFactory(pulsar.getConfig()); this.managedLedgerStorage = pulsar.getManagedLedgerStorage(); this.keepAliveIntervalSeconds = pulsar.getConfiguration().getKeepAliveIntervalSeconds(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 0784f74591e..9d0c1080254 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -899,30 +899,6 @@ public class Producer { private static final Logger log = LoggerFactory.getLogger(Producer.class); - /** - * This method increments a counter that is used to control the throttling of a connection. - * The connection's read operations are paused when the counter's value is greater than 0, indicating that - * throttling is in effect. - * It's important to note that after calling this method, it is the caller's responsibility to ensure that the - * counter is decremented by calling the {@link #decrementThrottleCount()} method when throttling is no longer - * needed on the connection. - */ - public void incrementThrottleCount() { - cnx.incrementThrottleCount(); - } - - /** - * This method decrements a counter that is used to control the throttling of a connection. - * The connection's read operations are resumed when the counter's value is 0, indicating that - * throttling is no longer in effect. - * It's important to note that before calling this method, the caller should have previously - * incremented the counter by calling the {@link #incrementThrottleCount()} method when throttling - * was needed on the connection. - */ - public void decrementThrottleCount() { - cnx.decrementThrottleCount(); - } - public Attributes getOpenTelemetryAttributes() { if (attributes != null) { return attributes; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java index 0015f2675a2..096418191dc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java @@ -24,6 +24,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.qos.AsyncTokenBucket; import org.apache.pulsar.broker.qos.MonotonicClock; @@ -42,9 +43,14 @@ public class PublishRateLimiterImpl implements PublishRateLimiter { private final AtomicInteger throttledProducersCount = new AtomicInteger(0); private final AtomicBoolean processingQueuedProducers = new AtomicBoolean(false); + private final Consumer<Producer> throttleAction; + private final Consumer<Producer> unthrottleAction; - public PublishRateLimiterImpl(MonotonicClock monotonicClock) { + public PublishRateLimiterImpl(MonotonicClock monotonicClock, Consumer<Producer> throttleAction, + Consumer<Producer> unthrottleAction) { this.monotonicClock = monotonicClock; + this.throttleAction = throttleAction; + this.unthrottleAction = unthrottleAction; } /** @@ -68,7 +74,7 @@ public class PublishRateLimiterImpl implements PublishRateLimiter { } if (shouldThrottle) { // throttle the producer by incrementing the throttle count - producer.incrementThrottleCount(); + throttleAction.accept(producer); // schedule decrementing the throttle count to possibly unthrottle the producer after the // throttling period scheduleDecrementThrottleCount(producer); @@ -136,7 +142,8 @@ public class PublishRateLimiterImpl implements PublishRateLimiter { while ((throttlingDuration = calculateThrottlingDurationNanos()) == 0L && (producer = unthrottlingQueue.poll()) != null) { try { - producer.decrementThrottleCount(); + final Producer producerFinal = producer; + producer.getCnx().execute(() -> unthrottleAction.accept(producerFinal)); } catch (Exception e) { log.error("Failed to unthrottle producer {}", producer, e); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 1b38d91f888..aa6be2eaab0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -25,6 +25,7 @@ import static org.apache.commons.lang3.StringUtils.EMPTY; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync; import static org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync; +import static org.apache.pulsar.broker.service.ServerCnxThrottleTracker.ThrottleType; import static org.apache.pulsar.broker.service.persistent.PersistentTopic.getMigratedClusterUrl; import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.ignoreUnrecoverableBKException; import static org.apache.pulsar.common.api.proto.ProtocolVersion.v5; @@ -286,7 +287,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { if (maxPendingBytesPerThread > 0 && pendingBytes > maxPendingBytesPerThread && !limitExceeded) { limitExceeded = true; - cnxsPerThread.get().forEach(cnx -> cnx.throttleTracker.setPublishBufferLimiting(true)); + cnxsPerThread.get().forEach(cnx -> cnx.throttleTracker.markThrottled( + ThrottleType.IOThreadMaxPendingPublishBytesExceeded)); } } @@ -296,7 +298,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { // we resume all connections sharing the same thread if (limitExceeded && pendingBytes <= resumeThresholdPendingBytesPerThread) { limitExceeded = false; - cnxsPerThread.get().forEach(cnx -> cnx.throttleTracker.setPublishBufferLimiting(false)); + cnxsPerThread.get().forEach(cnx -> cnx.throttleTracker.unmarkThrottled( + ThrottleType.IOThreadMaxPendingPublishBytesExceeded)); } } } @@ -314,6 +317,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { Start, Connected, Failed, Connecting } + @Getter private final ServerCnxThrottleTracker throttleTracker; public ServerCnx(PulsarService pulsar) { @@ -482,12 +486,12 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { log.warn("[{}] Reached rate limitation", this); // Stop receiving requests. pausedDueToRateLimitation = true; - ctx.channel().config().setAutoRead(false); + getThrottleTracker().markThrottled(ThrottleType.ConnectionPauseReceivingCooldownRateLimit); // Resume after 1 second. ctx.channel().eventLoop().schedule(() -> { if (pausedDueToRateLimitation) { log.info("[{}] Resuming connection after rate limitation", this); - ctx.channel().config().setAutoRead(true); + getThrottleTracker().unmarkThrottled(ThrottleType.ConnectionPauseReceivingCooldownRateLimit); pausedDueToRateLimitation = false; } }, requestRateLimiter.getPeriodAtMs(), TimeUnit.MILLISECONDS); @@ -498,7 +502,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { if (pauseReceivingRequestsIfUnwritable && ctx.channel().isWritable()) { log.info("[{}] is writable, turn on channel auto-read", this); - ctx.channel().config().setAutoRead(true); + getThrottleTracker().unmarkThrottled(ThrottleType.ConnectionOutboundBufferFull); requestRateLimiter.timingOpen(pauseReceivingCooldownMilliSeconds, TimeUnit.MILLISECONDS); } else if (pauseReceivingRequestsIfUnwritable && !ctx.channel().isWritable()) { final ChannelOutboundBuffer outboundBuffer = ctx.channel().unsafe().outboundBuffer(); @@ -512,7 +516,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { PAUSE_RECEIVING_LOG.debug("[{}] is not writable, turn off channel auto-read", this); } } - ctx.channel().config().setAutoRead(false); + getThrottleTracker().markThrottled(ThrottleType.ConnectionOutboundBufferFull); } ctx.fireChannelWritabilityChanged(); } @@ -3450,7 +3454,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { // or the pending publish bytes private void increasePendingSendRequestsAndPublishBytes(int msgSize) { if (++pendingSendRequest == maxPendingSendRequests) { - throttleTracker.setPendingSendRequestsExceeded(true); + throttleTracker.markThrottled(ThrottleType.ConnectionMaxPendingPublishRequestsExceeded); } PendingBytesPerThreadTracker.getInstance().incrementPublishBytes(msgSize, maxPendingBytesPerThread); } @@ -3475,7 +3479,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { PendingBytesPerThreadTracker.getInstance().decrementPublishBytes(msgSize, resumeThresholdPendingBytesPerThread); if (--pendingSendRequest == resumeReadsThreshold) { - throttleTracker.setPendingSendRequestsExceeded(false); + throttleTracker.unmarkThrottled(ThrottleType.ConnectionMaxPendingPublishRequestsExceeded); } if (isNonPersistentTopic) { @@ -3854,22 +3858,6 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { this.authRole = authRole; } - /** - * {@inheritDoc} - */ - @Override - public void incrementThrottleCount() { - throttleTracker.incrementThrottleCount(); - } - - /** - * {@inheritDoc} - */ - @Override - public void decrementThrottleCount() { - throttleTracker.decrementThrottleCount(); - } - @VisibleForTesting void setAuthState(AuthenticationState authState) { this.authState = authState; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java index 78bac024218..037612d2156 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java @@ -18,125 +18,442 @@ */ package org.apache.pulsar.broker.service; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import com.google.common.annotations.VisibleForTesting; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.ServiceConfiguration; /** - * Tracks the state of throttling for a connection. The throttling happens by pausing reads by setting - * Netty {@link io.netty.channel.ChannelConfig#setAutoRead(boolean)} to false for the channel (connection). - * <p> - * There can be multiple rate limiters that can throttle a connection. Each rate limiter will independently - * call the {@link #incrementThrottleCount()} and {@link #decrementThrottleCount()} methods to signal that the - * connection should be throttled or not. The connection will be throttled if the counter is greater than 0. - * <p> - * Besides the rate limiters, the connection can also be throttled if the number of pending publish requests exceeds - * a configured threshold. This throttling is toggled with the {@link #setPendingSendRequestsExceeded} method. - * There's also per-thread memory limits which could throttle the connection. This throttling is toggled with the - * {@link #setPublishBufferLimiting} method. Internally, these two methods will call the - * {@link #incrementThrottleCount()} and {@link #decrementThrottleCount()} methods when the state changes. + * Manages and tracks throttling state for server connections in Apache Pulsar. + * + * <p>This class provides a centralized mechanism to control connection throttling by managing + * multiple throttling conditions simultaneously. When throttling is active, it pauses incoming + * requests by setting Netty's {@link io.netty.channel.ChannelConfig#setAutoRead(boolean)} to + * {@code false} for the associated channel. + * + * <h3>Throttling Mechanism</h3> + * <p>The tracker maintains independent counters for different types of throttling conditions + * defined in {@link ThrottleType}. A connection is considered throttled if any of these + * conditions are active (counter > 0). The connection will only resume normal operation + * when all throttling conditions are cleared. + * + * <h3>Supported Throttling Types</h3> + * <ul> + * <li><b>Connection-level:</b> Max pending publish requests, outbound buffer limits</li> + * <li><b>Thread-level:</b> IO thread memory limits for in-flight publishing</li> + * <li><b>Topic-level:</b> Topic publish rate limiting</li> + * <li><b>Resource Group-level:</b> Resource group publish rate limiting</li> + * <li><b>Broker-level:</b> Global broker publish rate limiting</li> + * <li><b>Flow Control:</b> Channel writability and cooldown rate limiting</li> + * </ul> + * + * <h3>Reentrant vs Non-Reentrant Types</h3> + * <p>Some throttling types support multiple concurrent activations (reentrant): + * <ul> + * <li>{@link ThrottleType#TopicPublishRate} - Reentrant because multiple producers may share + * the same rate limiter which relates to the same topic</li> + * <li>{@link ThrottleType#ResourceGroupPublishRate} - Reentrant because multiple producers may share + * the same rate limiter which relates to the same resource group</li> + * </ul> + * <p>Other types are non-reentrant and can only be activated once at a time. The reentrant types + * use counters to track how many producers are affected by the same shared rate limiter, while + * non-reentrant types use simple boolean states. + * + * <h3>Thread Safety</h3> + * <p>This class is designed to be used from a single thread (the connection's IO thread) + * and is not thread-safe for concurrent access from multiple threads. + * + * <h3>Usage Example</h3> + * <pre>{@code + * ServerCnxThrottleTracker tracker = new ServerCnxThrottleTracker(serverCnx); + * + * // Mark connection as throttled due to rate limiting + * tracker.markThrottled(ThrottleType.TopicPublishRate); + * + * // Later, when rate limiting condition is cleared + * tracker.unmarkThrottled(ThrottleType.TopicPublishRate); + * }</pre> + * + * @see ThrottleType + * @see ThrottleRes + * @see ServerCnx */ @Slf4j -final class ServerCnxThrottleTracker { - - private static final AtomicIntegerFieldUpdater<ServerCnxThrottleTracker> THROTTLE_COUNT_UPDATER = - AtomicIntegerFieldUpdater.newUpdater( - ServerCnxThrottleTracker.class, "throttleCount"); - - private static final AtomicIntegerFieldUpdater<ServerCnxThrottleTracker> - PENDING_SEND_REQUESTS_EXCEEDED_UPDATER = - AtomicIntegerFieldUpdater.newUpdater( - ServerCnxThrottleTracker.class, "pendingSendRequestsExceeded"); - private static final AtomicIntegerFieldUpdater<ServerCnxThrottleTracker> PUBLISH_BUFFER_LIMITING_UPDATER = - AtomicIntegerFieldUpdater.newUpdater( - ServerCnxThrottleTracker.class, "publishBufferLimiting"); +public final class ServerCnxThrottleTracker { + private final ServerCnx serverCnx; - private volatile int throttleCount; - private volatile int pendingSendRequestsExceeded; - private volatile int publishBufferLimiting; + private final int[] states = new int[ThrottleType.values().length]; + /** + * Enumeration of different throttling conditions that can be applied to a server connection. + * + * <p>Each type represents a specific resource constraint or rate limiting condition + * that may require throttling the connection to maintain system stability and fairness. + * + * <p>Some types support reentrant behavior (can be activated multiple times concurrently), + * while others are non-reentrant (single activation only). + */ + public enum ThrottleType { - public ServerCnxThrottleTracker(ServerCnx serverCnx) { - this.serverCnx = serverCnx; + /** + * Throttling due to excessive pending publish requests on the connection. + * + * <p>This throttling is activated when the number of in-flight publish requests + * exceeds the configured limit. It helps prevent memory exhaustion and ensures + * fair resource allocation across connections. + * + * <p><b>Type:</b> Non-reentrant + * <p><b>Configuration:</b> {@link ServiceConfiguration#getMaxPendingPublishRequestsPerConnection()} + */ + ConnectionMaxPendingPublishRequestsExceeded(false), + + /** + * Throttling due to excessive memory usage by in-flight publish operations on the IO thread. + * + * <p>This throttling is activated when the total memory used by pending publish operations + * on a shared IO thread exceeds the configured limit. Multiple connections may share the + * same IO thread, so this limit applies across all connections on that thread. + * + * <p><b>Type:</b> Non-reentrant + * <p><b>Configuration:</b> {@link ServiceConfiguration#getMaxMessagePublishBufferSizeInMB()} + */ + IOThreadMaxPendingPublishBytesExceeded(false), + + /** + * Throttling due to topic-level publish rate limiting. + * + * <p>This throttling is activated when publish operations exceed the configured + * rate limits for a specific topic. Multiple producers on the same topic may + * contribute to triggering this throttling condition. + * + * <p><b>Type:</b> Reentrant (supports multiple concurrent activations) + * <br><b>Reason for reentrancy:</b> Multiple producers may share the same rate limiter + * which relates to the same topic. Each producer can independently trigger throttling + * when the shared topic rate limiter becomes active, requiring a counter to track + * how many producers are affected by the same rate limiter. + * + * <p><b>Configuration:</b> Topic-level publish rate policies + */ + TopicPublishRate(true), + + /** + * Throttling due to resource group-level publish rate limiting. + * + * <p>This throttling is activated when publish operations exceed the configured + * rate limits for a resource group. Resource groups allow fine-grained control + * over resource allocation across multiple topics and tenants. + * + * <p><b>Type:</b> Reentrant (supports multiple concurrent activations) + * <br><b>Reason for reentrancy:</b> Multiple producers may share the same rate limiter + * which relates to the same resource group. Each producer can independently trigger + * throttling when the shared resource group rate limiter becomes active, requiring + * a counter to track how many producers are affected by the same rate limiter. + * + * <p><b>Configuration:</b> Resource group publish rate policies + */ + ResourceGroupPublishRate(true), + + /** + * Throttling due to broker-level publish rate limiting. + * + * <p>This throttling is activated when publish operations exceed the global + * broker-level rate limits. This provides a safety mechanism to prevent + * the entire broker from being overwhelmed by publish traffic. + * + * <p><b>Type:</b> Non-reentrant + * <p><b>Configuration:</b> {@link ServiceConfiguration#getBrokerPublisherThrottlingMaxMessageRate()} + * and {@link ServiceConfiguration#getBrokerPublisherThrottlingMaxByteRate()} + */ + BrokerPublishRate(false), + /** + * Throttling due to channel outbound buffer being full. + * + * <p>This throttling is activated when the Netty channel's outbound buffer + * reaches its high water mark, indicating that the client cannot keep up + * with the rate of outgoing messages. This prevents memory exhaustion + * and provides backpressure to publishers. + * + * <p><b>Type:</b> Non-reentrant + * <p><b>Reference:</b> PIP-434: Expose Netty channel configuration WRITE_BUFFER_WATER_MARK + */ + ConnectionOutboundBufferFull(false), + + /** + * Throttling due to connection pause/resume cooldown rate limiting. + * + * <p>This throttling is activated during cooldown periods after a connection + * has been resumed from a throttled state. It prevents rapid oscillation + * between throttled and unthrottled states. + * + * <p><b>Type:</b> Non-reentrant + */ + ConnectionPauseReceivingCooldownRateLimit(false); + + @Getter + final boolean reentrant; + + ThrottleType(boolean reentrant) { + this.reentrant = reentrant; + } } /** - * See {@link Producer#incrementThrottleCount()} for documentation. + * Enumeration representing the result of a throttling state change operation. + * + * <p>This enum indicates what happened when a throttling condition was marked or unmarked, + * helping callers understand whether the overall connection state changed or if the + * operation was ignored. */ - public void incrementThrottleCount() { - int currentThrottleCount = THROTTLE_COUNT_UPDATER.incrementAndGet(this); - if (currentThrottleCount == 1) { - changeAutoRead(false); + enum ThrottleRes { + /** + * The operation resulted in a change to the overall connection throttling state. + * + * <p>This occurs when: + * <ul> + * <li>The connection transitions from unthrottled to throttled (first throttle type activated)</li> + * <li>The connection transitions from throttled to unthrottled (last throttle type deactivated)</li> + * </ul> + * + * <p>When this result is returned, the connection's auto-read setting will be updated + * accordingly to pause or resume request processing. + */ + ConnectionStateChanged, + + /** + * The operation changed the state of the specific throttle type but did not affect + * the overall connection throttling state. + * + * <p>This occurs when: + * <ul> + * <li>A throttle type is activated, but the connection was already throttled by other types</li> + * <li>A throttle type is deactivated, but the connection remains throttled by other types</li> + * <li>A reentrant throttle type's counter is incremented or decremented</li> + * </ul> + */ + TypeStateChanged, + + /** + * The operation was dropped because it would violate the throttle type's constraints. + * + * <p>This occurs when: + * <ul> + * <li>Attempting to mark a non-reentrant throttle type that is already active</li> + * <li>Attempting to unmark a throttle type that is not currently active</li> + * <li>Attempting to unmark a reentrant throttle type with an invalid counter state</li> + * </ul> + */ + Dropped + } + + /** + * Checks if the connection is currently throttled by any throttle type. + * + * <p>This method examines all throttle type states and returns {@code true} + * if any of them are active (counter > 0). + * + * @return {@code true} if any throttling condition is active, {@code false} otherwise + */ + private boolean hasThrottled() { + for (int stat : states) { + if (stat > 0) { + return true; + } } + return false; } /** - * See {@link Producer#decrementThrottleCount()} for documentation. + * Returns the total count of active throttling conditions across all types. + * + * <p>This method sums up all the individual counters for each throttle type, + * providing a measure of the overall throttling pressure on the connection. + * For reentrant types, this includes the full counter value (not just 0 or 1). + * + * @return the total number of active throttling conditions */ - public void decrementThrottleCount() { - int currentThrottleCount = THROTTLE_COUNT_UPDATER.decrementAndGet(this); - if (currentThrottleCount == 0) { - changeAutoRead(true); + @VisibleForTesting + public int throttledCount() { + int i = 0; + for (int stat : states) { + i += stat; } + return i; } - private void changeAutoRead(boolean autoRead) { - if (isChannelActive()) { + /** + * Marks the connection as throttled for the specified throttle type. + * + * <p>This method activates throttling for the given type and may pause the connection's + * request processing if this is the first active throttling condition. For reentrant + * types ({@link ThrottleType#TopicPublishRate} and {@link ThrottleType#ResourceGroupPublishRate}), + * this increments the counter. For non-reentrant types, this sets the state to active. + * + * <p>If the connection transitions from unthrottled to throttled, this method will + * set the Netty channel's auto-read to {@code false}, effectively pausing incoming + * request processing. + * + * <p>Metrics are automatically recorded to track throttling events and connection state changes. + * + * @param type the type of throttling condition to activate + * @throws IllegalArgumentException if type is null + * + * @see #unmarkThrottled(ThrottleType) + * @see ThrottleType + */ + public void markThrottled(ThrottleType type) { + assert serverCnx.ctx().executor().inEventLoop() : "This method should be called in serverCnx.ctx().executor()"; + ThrottleRes res = doMarkThrottled(type); + recordMetricsAfterThrottling(type, res); + if (res == ThrottleRes.ConnectionStateChanged && isChannelActive()) { if (log.isDebugEnabled()) { - log.debug("[{}] Setting auto read to {}", serverCnx.toString(), autoRead); + log.debug("[{}] Setting auto read to false", serverCnx.toString()); } - // change the auto read flag on the channel - serverCnx.ctx().channel().config().setAutoRead(autoRead); - } - // update the metrics that track throttling - if (autoRead) { - serverCnx.getBrokerService().recordConnectionResumed(); - } else if (isChannelActive()) { - serverCnx.increasePublishLimitedTimesForTopics(); - serverCnx.getBrokerService().recordConnectionPaused(); + serverCnx.ctx().channel().config().setAutoRead(false); } } - private boolean isChannelActive() { - return serverCnx.isActive() && serverCnx.ctx() != null && serverCnx.ctx().channel().isActive(); + /** + * Unmarks the connection as throttled for the specified throttle type. + * + * <p>This method deactivates throttling for the given type and may resume the connection's + * request processing if this was the last active throttling condition. For reentrant + * types ({@link ThrottleType#TopicPublishRate} and {@link ThrottleType#ResourceGroupPublishRate}), + * this decrements the counter. For non-reentrant types, this clears the active state. + * + * <p>If the connection transitions from throttled to unthrottled, this method will + * set the Netty channel's auto-read to {@code true}, effectively resuming incoming + * request processing. + * + * <p>Metrics are automatically recorded to track unthrottling events and connection state changes. + * + * @param type the type of throttling condition to deactivate + * @throws IllegalArgumentException if type is null + * + * @see #markThrottled(ThrottleType) + * @see ThrottleType + */ + public void unmarkThrottled(ThrottleType type) { + assert serverCnx.ctx().executor().inEventLoop() : "This method should be called in serverCnx.ctx().executor()"; + ThrottleRes res = doUnmarkThrottled(type); + recordMetricsAfterUnthrottling(type, res); + if (res == ThrottleRes.ConnectionStateChanged && isChannelActive()) { + if (log.isDebugEnabled()) { + log.debug("[{}] Setting auto read to true", serverCnx.toString()); + } + serverCnx.ctx().channel().config().setAutoRead(true); + } } - public void setPublishBufferLimiting(boolean throttlingEnabled) { - changeThrottlingFlag(PUBLISH_BUFFER_LIMITING_UPDATER, throttlingEnabled); + /** + * Internal method to mark a throttle type as active without side effects. + * + * <p>This method updates the internal state for the specified throttle type + * and returns the result of the operation. It handles both reentrant and + * non-reentrant throttle types appropriately: + * + * <ul> + * <li><b>Reentrant types:</b> Increment the counter</li> + * <li><b>Non-reentrant types:</b> Set to active (1) if not already active</li> + * </ul> + * + * @param throttleType the type of throttling to mark as active + * @return the result of the marking operation + * @see ThrottleRes + */ + private ThrottleRes doMarkThrottled(ThrottleType throttleType) { + // Two reentrant type: "TopicPublishRate" and "ResourceGroupPublishRate". + boolean throttled = hasThrottled(); + int value = states[throttleType.ordinal()]; + if (throttleType.isReentrant()) { + states[throttleType.ordinal()] = value + 1; + } else { + states[throttleType.ordinal()] = 1; + if (value != 0) { + return ThrottleRes.Dropped; + } + } + return throttled ? ThrottleRes.TypeStateChanged : ThrottleRes.ConnectionStateChanged; } - public void setPendingSendRequestsExceeded(boolean throttlingEnabled) { - boolean changed = changeThrottlingFlag(PENDING_SEND_REQUESTS_EXCEEDED_UPDATER, throttlingEnabled); - if (changed) { - // update the metrics that track throttling due to pending send requests - if (throttlingEnabled) { - serverCnx.getBrokerService().recordConnectionThrottled(); - } else { - serverCnx.getBrokerService().recordConnectionUnthrottled(); + /** + * Internal method to unmark a throttle type as active without side effects. + * + * <p>This method updates the internal state for the specified throttle type + * and returns the result of the operation. It handles both reentrant and + * non-reentrant throttle types appropriately: + * + * <ul> + * <li><b>Reentrant types:</b> Decrement the counter</li> + * <li><b>Non-reentrant types:</b> Clear active state if currently active</li> + * </ul> + * + * @param throttleType the type of throttling to mark as inactive + * @return the result of the unmarking operation + * @see ThrottleRes + */ + private ThrottleRes doUnmarkThrottled(ThrottleType throttleType) { + int value = states[throttleType.ordinal()]; + if (throttleType.isReentrant()) { + states[throttleType.ordinal()] = value - 1; + } else { + if (value != 1) { + return ThrottleRes.Dropped; } + states[throttleType.ordinal()] = 0; + } + return hasThrottled() ? ThrottleRes.TypeStateChanged : ThrottleRes.ConnectionStateChanged; + } + + /** + * Records metrics after a throttling operation. + * + * <p>This method updates various broker metrics to track throttling events: + * <ul> + * <li>Connection-specific throttling metrics for in-flight publishing limits</li> + * <li>Connection pause metrics when the overall connection state changes</li> + * <li>Topic-level publish limiting counters</li> + * </ul> + * + * @param type the throttle type that was activated + * @param res the result of the throttling operation + */ + private void recordMetricsAfterThrottling(ThrottleType type, ThrottleRes res) { + if (type == ThrottleType.ConnectionMaxPendingPublishRequestsExceeded && res != ThrottleRes.Dropped) { + serverCnx.getBrokerService().recordConnectionThrottled(); + } + if (res == ThrottleRes.ConnectionStateChanged && isChannelActive()) { + serverCnx.increasePublishLimitedTimesForTopics(); + serverCnx.getBrokerService().recordConnectionPaused(); } } - private boolean changeThrottlingFlag(AtomicIntegerFieldUpdater<ServerCnxThrottleTracker> throttlingFlagFieldUpdater, - boolean throttlingEnabled) { - // don't change a throttling flag if the channel is not active - if (!isChannelActive()) { - return false; + /** + * Records metrics after an unthrottling operation. + * + * <p>This method updates various broker metrics to track unthrottling events: + * <ul> + * <li>Connection-specific unthrottling metrics for in-flight publishing limits</li> + * <li>Connection resume metrics when the overall connection state changes</li> + * </ul> + * + * @param type the throttle type that was deactivated + * @param res the result of the unthrottling operation + */ + private void recordMetricsAfterUnthrottling(ThrottleType type, ThrottleRes res) { + if (type == ThrottleType.ConnectionMaxPendingPublishRequestsExceeded && res != ThrottleRes.Dropped) { + serverCnx.getBrokerService().recordConnectionUnthrottled(); } - if (throttlingFlagFieldUpdater.compareAndSet(this, booleanToInt(!throttlingEnabled), - booleanToInt(throttlingEnabled))) { - if (throttlingEnabled) { - incrementThrottleCount(); - } else { - decrementThrottleCount(); - } - return true; - } else { - return false; + if (res == ThrottleRes.ConnectionStateChanged && isChannelActive()) { + serverCnx.getBrokerService().recordConnectionResumed(); } } - private static int booleanToInt(boolean value) { - return value ? 1 : 0; + public ServerCnxThrottleTracker(ServerCnx serverCnx) { + this.serverCnx = serverCnx; + } + + private boolean isChannelActive() { + return serverCnx.isActive() && serverCnx.ctx() != null && serverCnx.ctx().channel().isActive(); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java index 63599f09eef..2c0b247a94b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java @@ -88,22 +88,9 @@ public interface TransportCnx { CompletableFuture<Optional<Boolean>> checkConnectionLiveness(); /** - * Increments the counter that controls the throttling of the connection by pausing reads. - * The connection will be throttled while the counter is greater than 0. - * <p> - * The caller is responsible for decrementing the counter by calling {@link #decrementThrottleCount()} when the - * connection should no longer be throttled. + * Get the throttle tracker for this connection. */ - void incrementThrottleCount(); - - /** - * Decrements the counter that controls the throttling of the connection by pausing reads. - * The connection will be throttled while the counter is greater than 0. - * <p> - * This method should be called when the connection should no longer be throttled. However, the caller should have - * previously called {@link #incrementThrottleCount()}. - */ - void decrementThrottleCount(); + ServerCnxThrottleTracker getThrottleTracker(); FeatureFlags getFeatures(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterDisableTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterDisableTest.java index ec952a7ca77..3e6edb04932 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterDisableTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterDisableTest.java @@ -18,10 +18,11 @@ */ package org.apache.pulsar.broker.service; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import org.apache.pulsar.broker.qos.AsyncTokenBucket; +import org.testng.Assert; import org.testng.annotations.Test; public class PublishRateLimiterDisableTest { @@ -29,9 +30,23 @@ public class PublishRateLimiterDisableTest { // GH issue #10603 @Test void shouldAlwaysAllowAcquire() { - PublishRateLimiter publishRateLimiter = new PublishRateLimiterImpl(AsyncTokenBucket.DEFAULT_SNAPSHOT_CLOCK); + PublishRateLimiter publishRateLimiter = new PublishRateLimiterImpl(AsyncTokenBucket.DEFAULT_SNAPSHOT_CLOCK, + producer -> { + producer.getCnx().getThrottleTracker().markThrottled( + ServerCnxThrottleTracker.ThrottleType.BrokerPublishRate); + }, producer -> { + producer.getCnx().getThrottleTracker().unmarkThrottled( + ServerCnxThrottleTracker.ThrottleType.BrokerPublishRate); + }); Producer producer = mock(Producer.class); + ServerCnx serverCnx = mock(ServerCnx.class); + doAnswer(a -> serverCnx).when(producer).getCnx(); + ServerCnxThrottleTracker throttleTracker = new ServerCnxThrottleTracker(serverCnx); + doAnswer(a -> throttleTracker).when(serverCnx).getThrottleTracker(); + when(producer.getCnx()).thenReturn(serverCnx); + BrokerService brokerService = mock(BrokerService.class); + when(serverCnx.getBrokerService()).thenReturn(brokerService); publishRateLimiter.handlePublishThrottling(producer, Integer.MAX_VALUE, Long.MAX_VALUE); - verify(producer, never()).incrementThrottleCount(); + Assert.assertEquals(throttleTracker.throttledCount(), 0); } } \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java index 20c1ad0a412..573e3980c73 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java @@ -19,18 +19,18 @@ package org.apache.pulsar.broker.service; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.DefaultEventLoop; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.DefaultThreadFactory; import java.util.HashMap; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; @@ -47,36 +47,40 @@ public class PublishRateLimiterTest { private AtomicLong manualClockSource; private Producer producer; + private ServerCnx serverCnx; private PublishRateLimiterImpl publishRateLimiter; - - private AtomicInteger throttleCount = new AtomicInteger(0); + private ServerCnxThrottleTracker throttleTracker; + private DefaultThreadFactory threadFactory = new DefaultThreadFactory("pulsar-io"); + private EventLoop eventLoop = new DefaultEventLoop(threadFactory); @BeforeMethod public void setup() throws Exception { policies.publishMaxMessageRate = new HashMap<>(); policies.publishMaxMessageRate.put(CLUSTER_NAME, publishRate); manualClockSource = new AtomicLong(TimeUnit.SECONDS.toNanos(100)); - publishRateLimiter = new PublishRateLimiterImpl(() -> manualClockSource.get()); + publishRateLimiter = new PublishRateLimiterImpl(() -> manualClockSource.get(), + producer -> { + producer.getCnx().getThrottleTracker().markThrottled( + ServerCnxThrottleTracker.ThrottleType.TopicPublishRate); + }, producer -> { + producer.getCnx().getThrottleTracker().unmarkThrottled( + ServerCnxThrottleTracker.ThrottleType.TopicPublishRate); + }); publishRateLimiter.update(policies, CLUSTER_NAME); producer = mock(Producer.class); - throttleCount.set(0); - doAnswer(a -> { - throttleCount.incrementAndGet(); - return null; - }).when(producer).incrementThrottleCount(); - doAnswer(a -> { - throttleCount.decrementAndGet(); - return null; - }).when(producer).decrementThrottleCount(); - TransportCnx transportCnx = mock(TransportCnx.class); - when(producer.getCnx()).thenReturn(transportCnx); + serverCnx = mock(ServerCnx.class); + ChannelHandlerContext channelHandlerContext = mock(ChannelHandlerContext.class); + doAnswer(a -> eventLoop).when(channelHandlerContext).executor(); + doAnswer(a -> channelHandlerContext).when(serverCnx).ctx(); + doAnswer(a -> this.serverCnx).when(producer).getCnx(); + throttleTracker = new ServerCnxThrottleTracker(this.serverCnx); + doAnswer(a -> throttleTracker).when(this.serverCnx).getThrottleTracker(); + when(producer.getCnx()).thenReturn(serverCnx); BrokerService brokerService = mock(BrokerService.class); - when(transportCnx.getBrokerService()).thenReturn(brokerService); + when(serverCnx.getBrokerService()).thenReturn(brokerService); EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class); when(brokerService.executor()).thenReturn(eventLoopGroup); - EventLoop eventLoop = mock(EventLoop.class); when(eventLoopGroup.next()).thenReturn(eventLoop); - doReturn(null).when(eventLoop).schedule(any(Runnable.class), anyLong(), any()); incrementSeconds(1); } @@ -86,38 +90,63 @@ public class PublishRateLimiterTest { policies.publishMaxMessageRate = null; } + @AfterMethod + public void tearDown() throws Exception { + eventLoop.shutdownGracefully(); + } + private void incrementSeconds(int seconds) { manualClockSource.addAndGet(TimeUnit.SECONDS.toNanos(seconds)); } @Test public void testPublishRateLimiterImplExceed() throws Exception { - // increment not exceed - publishRateLimiter.handlePublishThrottling(producer, 5, 50); - assertEquals(throttleCount.get(), 0); + CompletableFuture<Void> future = new CompletableFuture<>(); + eventLoop.execute(() -> { + try { + // increment not exceed + publishRateLimiter.handlePublishThrottling(producer, 5, 50); + assertEquals(throttleTracker.throttledCount(), 0); - incrementSeconds(1); + incrementSeconds(1); - // numOfMessages increment exceeded - publishRateLimiter.handlePublishThrottling(producer, 11, 100); - assertEquals(throttleCount.get(), 1); + // numOfMessages increment exceeded + publishRateLimiter.handlePublishThrottling(producer, 11, 100); + assertEquals(throttleTracker.throttledCount(), 1); - incrementSeconds(1); + incrementSeconds(1); + + // msgSizeInBytes increment exceeded + publishRateLimiter.handlePublishThrottling(producer, 9, 110); + assertEquals(throttleTracker.throttledCount(), 2); - // msgSizeInBytes increment exceeded - publishRateLimiter.handlePublishThrottling(producer, 9, 110); - assertEquals(throttleCount.get(), 2); + future.complete(null); + } catch (Exception e) { + future.completeExceptionally(e); + } + }); + future.get(5, TimeUnit.SECONDS); } @Test - public void testPublishRateLimiterImplUpdate() { - publishRateLimiter.handlePublishThrottling(producer, 11, 110); - assertEquals(throttleCount.get(), 1); - - // update - throttleCount.set(0); - publishRateLimiter.update(newPublishRate); - publishRateLimiter.handlePublishThrottling(producer, 11, 110); - assertEquals(throttleCount.get(), 0); + public void testPublishRateLimiterImplUpdate() throws Exception { + CompletableFuture<Void> future = new CompletableFuture<>(); + eventLoop.execute(() -> { + try { + publishRateLimiter.handlePublishThrottling(producer, 11, 110); + assertEquals(throttleTracker.throttledCount(), 1); + + // update + throttleTracker = new ServerCnxThrottleTracker(serverCnx); + publishRateLimiter.update(newPublishRate); + publishRateLimiter.handlePublishThrottling(producer, 11, 110); + assertEquals(throttleTracker.throttledCount(), 0); + + future.complete(null); + } catch (Exception e) { + future.completeExceptionally(e); + } + }); + future.get(5, TimeUnit.SECONDS); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java index 40bcb19ab0c..929350b599e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java @@ -73,6 +73,37 @@ public class TopicPublishRateThrottleTest extends BrokerTestBase{ pulsarClient.close(); } + @Test + public void testResumeEvenProducerClosed() throws Exception { + PublishRate publishRate = new PublishRate(1, 10); + conf.setMaxPendingPublishRequestsPerConnection(0); + super.baseSetup(); + admin.namespaces().setPublishRate("prop/ns-abc", publishRate); + final String topic = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp"); + org.apache.pulsar.client.api.Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic).create(); + + Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get(); + Assert.assertNotNull(topicRef); + MessageId messageId = null; + // first will be success, and the second will fail, will set auto read to false. + messageId = producer.sendAsync(new byte[10]).get(500, TimeUnit.MILLISECONDS); + Assert.assertNotNull(messageId); + // second will be blocked + producer.sendAsync(new byte[10]); + + // Verify: even through the producer was closed before the unblock, the state should be unblocked at the next + // period of rate limiter. + producer.close(); + Thread.sleep(3000); + org.apache.pulsar.client.api.Producer<byte[]> producer2 = pulsarClient.newProducer() + .topic(topic).create(); + producer2.sendAsync(new byte[2]).get(500, TimeUnit.MILLISECONDS); + + // Close the PulsarClient gracefully to avoid ByteBuf leak + pulsarClient.close(); + } + @Test public void testSystemTopicPublishNonBlock() throws Exception { super.baseSetup();
