lhotari commented on code in PR #24423: URL: https://github.com/apache/pulsar/pull/24423#discussion_r2374904864
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java: ########## @@ -438,11 +452,56 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { } } + private void checkPauseReceivingRequestsAfterResumeRateLimit(BaseCommand cmd) { + if (rateLimitingSecondsAfterResumeFromUnreadable <= 0 || cmd.getType() == BaseCommand.Type.PONG + || cmd.getType() == BaseCommand.Type.PING) { + return; + } + if (log.isDebugEnabled()) { + final ChannelOutboundBuffer outboundBuffer = ctx.channel().unsafe().outboundBuffer(); + if (outboundBuffer != null) { + log.debug("Start to handle request [{}], totalPendingWriteBytes: {}, channel isWritable: {}", + cmd.getType(), outboundBuffer.totalPendingWriteBytes(), ctx.channel().isWritable()); + } else { + log.debug("Start to handle request [{}], channel isWritable: {}", + cmd.getType(), ctx.channel().isWritable()); + } + } + // "requestRateLimiter" will return the permits that you acquired if it is not opening(has been called + // "timingOpen(duration)"). + if (requestRateLimiter.acquire(1) == 0 && !pausedDueToRateLimitation) { + log.warn("[{}] Reached rate limitation", this); + // Stop receiving requests. + pausedDueToRateLimitation = true; + ctx.channel().config().setAutoRead(false); + // Resume after 1 second. + ctx.channel().eventLoop().schedule(() -> { + if (pausedDueToRateLimitation) { + log.info("[{}] Resuming connection after rate limitation", this); + ctx.channel().config().setAutoRead(true); + pausedDueToRateLimitation = false; + } + }, 1, TimeUnit.SECONDS); + } + } + @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - if (log.isDebugEnabled()) { - log.debug("Channel writability has changed to: {}", ctx.channel().isWritable()); + if (pauseReceivingRequestsIfUnwritable && ctx.channel().isWritable()) { + log.info("[{}] is writable, turn on channel auto-read", this); + ctx.channel().config().setAutoRead(true); + requestRateLimiter.timingOpen(rateLimitingSecondsAfterResumeFromUnreadable, TimeUnit.SECONDS); + } else if (pauseReceivingRequestsIfUnwritable && !ctx.channel().isWritable()) { + final ChannelOutboundBuffer outboundBuffer = ctx.channel().unsafe().outboundBuffer(); + if (outboundBuffer != null) { + log.warn("[{}] is not writable, turn off channel auto-read, totalPendingWriteBytes: {}", + this, outboundBuffer.totalPendingWriteBytes()); + } else { + log.warn("[{}] is not writable, turn off channel auto-read", this); + } Review Comment: > Changed the log level to info(because it is really helpful for troubleshooting). As explained above, there is no issue when the channel switches to unwritable. There's nothing to troubleshoot. It will be flooding the logs, especially with the current default `pulsarChannelWriteBufferHighWaterMark` of 64kB. It doesn't make much sense to keep it at info level. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org