poorbarcode commented on code in PR #24423: URL: https://github.com/apache/pulsar/pull/24423#discussion_r2375482270
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java: ########## @@ -438,11 +452,56 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { } } + private void checkPauseReceivingRequestsAfterResumeRateLimit(BaseCommand cmd) { + if (rateLimitingSecondsAfterResumeFromUnreadable <= 0 || cmd.getType() == BaseCommand.Type.PONG + || cmd.getType() == BaseCommand.Type.PING) { + return; + } + if (log.isDebugEnabled()) { + final ChannelOutboundBuffer outboundBuffer = ctx.channel().unsafe().outboundBuffer(); + if (outboundBuffer != null) { + log.debug("Start to handle request [{}], totalPendingWriteBytes: {}, channel isWritable: {}", + cmd.getType(), outboundBuffer.totalPendingWriteBytes(), ctx.channel().isWritable()); + } else { + log.debug("Start to handle request [{}], channel isWritable: {}", + cmd.getType(), ctx.channel().isWritable()); + } + } + // "requestRateLimiter" will return the permits that you acquired if it is not opening(has been called + // "timingOpen(duration)"). + if (requestRateLimiter.acquire(1) == 0 && !pausedDueToRateLimitation) { + log.warn("[{}] Reached rate limitation", this); + // Stop receiving requests. + pausedDueToRateLimitation = true; + ctx.channel().config().setAutoRead(false); + // Resume after 1 second. + ctx.channel().eventLoop().schedule(() -> { + if (pausedDueToRateLimitation) { + log.info("[{}] Resuming connection after rate limitation", this); + ctx.channel().config().setAutoRead(true); + pausedDueToRateLimitation = false; + } + }, 1, TimeUnit.SECONDS); + } + } + @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - if (log.isDebugEnabled()) { - log.debug("Channel writability has changed to: {}", ctx.channel().isWritable()); + if (pauseReceivingRequestsIfUnwritable && ctx.channel().isWritable()) { + log.info("[{}] is writable, turn on channel auto-read", this); + ctx.channel().config().setAutoRead(true); + requestRateLimiter.timingOpen(rateLimitingSecondsAfterResumeFromUnreadable, TimeUnit.SECONDS); + } else if (pauseReceivingRequestsIfUnwritable && !ctx.channel().isWritable()) { + final ChannelOutboundBuffer outboundBuffer = ctx.channel().unsafe().outboundBuffer(); + if (outboundBuffer != null) { + log.warn("[{}] is not writable, turn off channel auto-read, totalPendingWriteBytes: {}", + this, outboundBuffer.totalPendingWriteBytes()); + } else { + log.warn("[{}] is not writable, turn off channel auto-read", this); + } Review Comment: Improved, thanks for your suggestion ########## pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java: ########## @@ -919,6 +919,58 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece ) private int brokerMaxConnections = 0; + @FieldContext( + category = CATEGORY_POLICIES, + doc = "It relates to configuration \"WriteBufferHighWaterMark\" of Netty Channel Config. If the number of bytes" + + " queued in the write buffer exceeds this value, channel writable state will start to return \"false\"." + ) + private int pulsarChannelWriteBufferHighWaterMark = 64 * 1024; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "It relates to configuration \"WriteBufferLowWaterMark\" of Netty Channel Config. If the number of bytes" + + " queued in the write buffer is smaller than this value, channel writable state will start to return" + + " \"true\"." + ) + private int pulsarChannelWriteBufferLowWaterMark = 32 * 1024; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "If enabled, the broker will pause reading from the channel to deal with new request once the writer" + + " buffer is full, until it is changed to writable." + ) + private boolean pulsarChannelPauseReceivingRequestsIfUnwritable = false; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "After the connection is recovered from an pause receiving state, the channel will be rate-limited" + + " for a of time window to avoid overwhelming due to the backlog of requests. This parameter defines" + + " how long the rate limiting should last, in seconds. Once the bytes that are waiting to be sent out" + + " reach the \"pulsarChannelWriteBufferHighWaterMark\", the timer will be reset. Setting a negative" + + " value will disable the rate limiting." + ) + private int pulsarChannelPauseReceivingCooldownMilliSeconds = 5000; Review Comment: Corrected the doc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org