lhotari commented on code in PR #24423:
URL: https://github.com/apache/pulsar/pull/24423#discussion_r2374904864


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -438,11 +452,56 @@ public void channelInactive(ChannelHandlerContext ctx) 
throws Exception {
         }
     }
 
+    private void checkPauseReceivingRequestsAfterResumeRateLimit(BaseCommand 
cmd) {
+        if (rateLimitingSecondsAfterResumeFromUnreadable <= 0 || cmd.getType() 
== BaseCommand.Type.PONG
+                || cmd.getType() == BaseCommand.Type.PING) {
+            return;
+        }
+        if (log.isDebugEnabled()) {
+            final ChannelOutboundBuffer outboundBuffer = 
ctx.channel().unsafe().outboundBuffer();
+            if (outboundBuffer != null) {
+                log.debug("Start to handle request [{}], 
totalPendingWriteBytes: {}, channel isWritable: {}",
+                        cmd.getType(), 
outboundBuffer.totalPendingWriteBytes(), ctx.channel().isWritable());
+            } else {
+                log.debug("Start to handle request [{}], channel isWritable: 
{}",
+                        cmd.getType(), ctx.channel().isWritable());
+            }
+        }
+        // "requestRateLimiter" will return the permits that you acquired if 
it is not opening(has been called
+        // "timingOpen(duration)").
+        if (requestRateLimiter.acquire(1) == 0 && !pausedDueToRateLimitation) {
+            log.warn("[{}] Reached rate limitation", this);
+            // Stop receiving requests.
+            pausedDueToRateLimitation = true;
+            ctx.channel().config().setAutoRead(false);
+            // Resume after 1 second.
+            ctx.channel().eventLoop().schedule(() -> {
+                if (pausedDueToRateLimitation) {
+                    log.info("[{}] Resuming connection after rate limitation", 
this);
+                    ctx.channel().config().setAutoRead(true);
+                    pausedDueToRateLimitation = false;
+                }
+            }, 1, TimeUnit.SECONDS);
+        }
+    }
+
     @Override
     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws 
Exception {
-        if (log.isDebugEnabled()) {
-            log.debug("Channel writability has changed to: {}", 
ctx.channel().isWritable());
+        if (pauseReceivingRequestsIfUnwritable && ctx.channel().isWritable()) {
+            log.info("[{}] is writable, turn on channel auto-read", this);
+            ctx.channel().config().setAutoRead(true);
+            
requestRateLimiter.timingOpen(rateLimitingSecondsAfterResumeFromUnreadable, 
TimeUnit.SECONDS);
+        } else if (pauseReceivingRequestsIfUnwritable && 
!ctx.channel().isWritable()) {
+            final ChannelOutboundBuffer outboundBuffer = 
ctx.channel().unsafe().outboundBuffer();
+            if (outboundBuffer != null) {
+                log.warn("[{}] is not writable, turn off channel auto-read, 
totalPendingWriteBytes: {}",
+                        this, outboundBuffer.totalPendingWriteBytes());
+            } else {
+                log.warn("[{}] is not writable, turn off channel auto-read", 
this);
+            }

Review Comment:
   > Changed the log level to info(because it is really helpful for 
troubleshooting).
   
   As explained above, there is no issue when the channel switches to 
unwritable. There's nothing to troubleshoot. It will be flooding the logs, 
especially with the current default `pulsarChannelWriteBufferHighWaterMark` of 
64kB. It doesn't make much sense to keep it at info level.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to