lhotari commented on code in PR #24423: URL: https://github.com/apache/pulsar/pull/24423#discussion_r2375426335
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java: ########## @@ -438,11 +452,56 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { } } + private void checkPauseReceivingRequestsAfterResumeRateLimit(BaseCommand cmd) { + if (rateLimitingSecondsAfterResumeFromUnreadable <= 0 || cmd.getType() == BaseCommand.Type.PONG + || cmd.getType() == BaseCommand.Type.PING) { + return; + } + if (log.isDebugEnabled()) { + final ChannelOutboundBuffer outboundBuffer = ctx.channel().unsafe().outboundBuffer(); + if (outboundBuffer != null) { + log.debug("Start to handle request [{}], totalPendingWriteBytes: {}, channel isWritable: {}", + cmd.getType(), outboundBuffer.totalPendingWriteBytes(), ctx.channel().isWritable()); + } else { + log.debug("Start to handle request [{}], channel isWritable: {}", + cmd.getType(), ctx.channel().isWritable()); + } + } + // "requestRateLimiter" will return the permits that you acquired if it is not opening(has been called + // "timingOpen(duration)"). + if (requestRateLimiter.acquire(1) == 0 && !pausedDueToRateLimitation) { + log.warn("[{}] Reached rate limitation", this); + // Stop receiving requests. + pausedDueToRateLimitation = true; + ctx.channel().config().setAutoRead(false); + // Resume after 1 second. + ctx.channel().eventLoop().schedule(() -> { + if (pausedDueToRateLimitation) { + log.info("[{}] Resuming connection after rate limitation", this); + ctx.channel().config().setAutoRead(true); + pausedDueToRateLimitation = false; + } + }, 1, TimeUnit.SECONDS); + } + } + @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - if (log.isDebugEnabled()) { - log.debug("Channel writability has changed to: {}", ctx.channel().isWritable()); + if (pauseReceivingRequestsIfUnwritable && ctx.channel().isWritable()) { + log.info("[{}] is writable, turn on channel auto-read", this); + ctx.channel().config().setAutoRead(true); + requestRateLimiter.timingOpen(rateLimitingSecondsAfterResumeFromUnreadable, TimeUnit.SECONDS); + } else if (pauseReceivingRequestsIfUnwritable && !ctx.channel().isWritable()) { + final ChannelOutboundBuffer outboundBuffer = ctx.channel().unsafe().outboundBuffer(); + if (outboundBuffer != null) { + log.warn("[{}] is not writable, turn off channel auto-read, totalPendingWriteBytes: {}", + this, outboundBuffer.totalPendingWriteBytes()); + } else { + log.warn("[{}] is not writable, turn off channel auto-read", this); + } Review Comment: * change log level to debug and add a `log.isDebugEnabled()` guard * if there's a need to troubleshoot in some environment, that could be handled with logging configuration with a unique logger name and enabling debug logging for just that logger. It's possible a sub-loggers with the name `org.apache.pulsar.broker.service.ServerCnx.pauseReceiving` so that debug logs for that feature could be enabled separately from ServerCnx debug logs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org