lhotari commented on code in PR #24423:
URL: https://github.com/apache/pulsar/pull/24423#discussion_r2373384002


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -438,11 +452,56 @@ public void channelInactive(ChannelHandlerContext ctx) 
throws Exception {
         }
     }
 
+    private void checkPauseReceivingRequestsAfterResumeRateLimit(BaseCommand 
cmd) {
+        if (rateLimitingSecondsAfterResumeFromUnreadable <= 0 || cmd.getType() 
== BaseCommand.Type.PONG

Review Comment:
   The later comment about "Please ensure that this is a no-op when this 
feature is disabled" hasn't been addressed yet. I would assume that it could be 
addressed by adding  `!pulsarChannelPauseReceivingRequestsIfUnwritable ||` 
here. 
   ```suggestion
           if (!pulsarChannelPauseReceivingRequestsIfUnwritable || 
rateLimitingSecondsAfterResumeFromUnreadable <= 0 || cmd.getType() == 
BaseCommand.Type.PONG
   ```



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -919,6 +919,56 @@ The max allowed delay for delayed delivery (in 
milliseconds). If the broker rece
     )
     private int brokerMaxConnections = 0;
 
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "It relates to configuration \"WriteBufferHighWaterMark\" of 
Netty Channel Config. If the number of bytes"
+            + " queued in the write buffer exceeds this value, channel 
writable state will start to return \"false\"."
+    )
+    private int pulsarChannelWriteBufferHighWaterMark = 64 * 1024;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "It relates to configuration \"WriteBufferLowWaterMark\" of 
Netty Channel Config. If the number of bytes"
+                + " queued in the write buffer is smaller than this value, 
channel writable state will start to return"
+                + " \"true\"."
+    )
+    private int pulsarChannelWriteBufferLowWaterMark = 32 * 1024;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "If enabled, the broker will pause reading from the channel to 
deal with new request once the writer"
+            + " buffer is full, until it is changed to writable."
+    )
+    private boolean pulsarChannelPauseReceivingRequestsIfUnwritable = false;
+
+    @FieldContext(
+            category = CATEGORY_POLICIES,
+            doc = "After the connection is recovered from an pause receiving 
state, the channel will be rate-limited"
+                + " for a of time window to avoid overwhelming due to the 
backlog of requests. This parameter defines"
+                + " how long the rate limiting should last, in seconds. Once 
the bytes that are waiting to be sent out"
+                + " reach the \"pulsarChannelWriteBufferHighWaterMark\", the 
timer will be reset. Setting a negative"
+                + " value will disable the rate limiting."
+    )
+    private int pulsarChannelPauseReceivingCooldownSeconds = 5;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "After the connection is recovered from a pause receiving state, 
the channel will be rate-limited for a"
+            + " period of time to avoid overwhelming due to the backlog of 
requests. This parameter defines how"
+            + " many requests should be allowed in the rate limiting period."
+
+    )
+    private int pulsarChannelPauseReceivingCooldownLimitRate = 5;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "After the connection is recovered from a pause receiving state, 
the channel will be rate-limited for a"
+            + " period of time to avoid overwhelming due to the backlog of 
requests. This parameter defines the"

Review Comment:
   "period of time" -> "period of time defined by 
pulsarChannelPauseReceivingCooldownSeconds



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -438,11 +450,37 @@ public void channelInactive(ChannelHandlerContext ctx) 
throws Exception {
         }
     }
 
+    protected void checkRateLimit(BaseCommand cmd) {
+        if (cmd.getType() == BaseCommand.Type.PONG && cmd.getType() == 
BaseCommand.Type.PING) {
+            return;
+        }
+        if (requestRateLimiter.acquire(1) == 0 && !pausedDueToRateLimitation) {

Review Comment:
   The comment about "Please ensure that this is a no-op when this feature is 
disabled" hasn't been addressed yet. I also added a new comment about 15 lines 
above.



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -919,6 +919,56 @@ The max allowed delay for delayed delivery (in 
milliseconds). If the broker rece
     )
     private int brokerMaxConnections = 0;
 
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "It relates to configuration \"WriteBufferHighWaterMark\" of 
Netty Channel Config. If the number of bytes"
+            + " queued in the write buffer exceeds this value, channel 
writable state will start to return \"false\"."
+    )
+    private int pulsarChannelWriteBufferHighWaterMark = 64 * 1024;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "It relates to configuration \"WriteBufferLowWaterMark\" of 
Netty Channel Config. If the number of bytes"
+                + " queued in the write buffer is smaller than this value, 
channel writable state will start to return"
+                + " \"true\"."
+    )
+    private int pulsarChannelWriteBufferLowWaterMark = 32 * 1024;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "If enabled, the broker will pause reading from the channel to 
deal with new request once the writer"
+            + " buffer is full, until it is changed to writable."
+    )
+    private boolean pulsarChannelPauseReceivingRequestsIfUnwritable = false;
+
+    @FieldContext(
+            category = CATEGORY_POLICIES,
+            doc = "After the connection is recovered from an pause receiving 
state, the channel will be rate-limited"
+                + " for a of time window to avoid overwhelming due to the 
backlog of requests. This parameter defines"
+                + " how long the rate limiting should last, in seconds. Once 
the bytes that are waiting to be sent out"
+                + " reach the \"pulsarChannelWriteBufferHighWaterMark\", the 
timer will be reset. Setting a negative"
+                + " value will disable the rate limiting."
+    )
+    private int pulsarChannelPauseReceivingCooldownSeconds = 5;

Review Comment:
   I think I added a comment about the granularity of this setting, but I no 
longer see the previous comment due to changes. seconds might be too granular 
for tuning purposes. using milliseconds from the beginning would resolve that.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -438,11 +452,56 @@ public void channelInactive(ChannelHandlerContext ctx) 
throws Exception {
         }
     }
 
+    private void checkPauseReceivingRequestsAfterResumeRateLimit(BaseCommand 
cmd) {
+        if (rateLimitingSecondsAfterResumeFromUnreadable <= 0 || cmd.getType() 
== BaseCommand.Type.PONG
+                || cmd.getType() == BaseCommand.Type.PING) {
+            return;
+        }
+        if (log.isDebugEnabled()) {
+            final ChannelOutboundBuffer outboundBuffer = 
ctx.channel().unsafe().outboundBuffer();
+            if (outboundBuffer != null) {
+                log.debug("Start to handle request [{}], 
totalPendingWriteBytes: {}, channel isWritable: {}",
+                        cmd.getType(), 
outboundBuffer.totalPendingWriteBytes(), ctx.channel().isWritable());
+            } else {
+                log.debug("Start to handle request [{}], channel isWritable: 
{}",
+                        cmd.getType(), ctx.channel().isWritable());
+            }
+        }
+        // "requestRateLimiter" will return the permits that you acquired if 
it is not opening(has been called
+        // "timingOpen(duration)").
+        if (requestRateLimiter.acquire(1) == 0 && !pausedDueToRateLimitation) {
+            log.warn("[{}] Reached rate limitation", this);
+            // Stop receiving requests.
+            pausedDueToRateLimitation = true;
+            ctx.channel().config().setAutoRead(false);
+            // Resume after 1 second.
+            ctx.channel().eventLoop().schedule(() -> {
+                if (pausedDueToRateLimitation) {
+                    log.info("[{}] Resuming connection after rate limitation", 
this);
+                    ctx.channel().config().setAutoRead(true);
+                    pausedDueToRateLimitation = false;
+                }
+            }, 1, TimeUnit.SECONDS);
+        }
+    }
+
     @Override
     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws 
Exception {
-        if (log.isDebugEnabled()) {
-            log.debug("Channel writability has changed to: {}", 
ctx.channel().isWritable());
+        if (pauseReceivingRequestsIfUnwritable && ctx.channel().isWritable()) {
+            log.info("[{}] is writable, turn on channel auto-read", this);
+            ctx.channel().config().setAutoRead(true);
+            
requestRateLimiter.timingOpen(rateLimitingSecondsAfterResumeFromUnreadable, 
TimeUnit.SECONDS);
+        } else if (pauseReceivingRequestsIfUnwritable && 
!ctx.channel().isWritable()) {
+            final ChannelOutboundBuffer outboundBuffer = 
ctx.channel().unsafe().outboundBuffer();
+            if (outboundBuffer != null) {
+                log.warn("[{}] is not writable, turn off channel auto-read, 
totalPendingWriteBytes: {}",
+                        this, outboundBuffer.totalPendingWriteBytes());
+            } else {
+                log.warn("[{}] is not writable, turn off channel auto-read", 
this);
+            }

Review Comment:
   I think that this will cause very verbose logging. 
   Channel writability changes are completely normal in a Netty application. No 
warning should be printed when the writability changes since it's not an error. 
There will always be cases where the broker will be filling the outbound buffer 
with very high amounts until there's a different type of flow control for 
dispatching to consumers. The dispatcher doesn't currently care if the channel 
is writable or not and that's also why Pulsar is really fast at the moment. It 
will buffer data and it can utilize the network very efficiently because of 
this. The flow control is currently based on 
`managedLedgerMaxReadsInFlightSizeInMB`. The main weakness is that the solution 
doesn't have a concept of "fairness". A single slow consumer can hog the 
resources even when it's not making progress. That's why writability should 
also be used in dispatching, but the high water mark / low water marks should 
be relatively high so that sufficient buffering is happening. Since water marks 
could be adjusted 
 separately for each channel instance, it should possibly be later some sort of 
policy setting so that client connections that require high throughput could be 
configured with higher buffering.
   This is why I think that adding PIP-434 is useful and we will learn the 
impacts of it along the way when we start experimenting how it behaves in 
practice. At this moment, I don't believe that the cooldown period is useful 
since channel writability can be switching back and forth constantly in a 
completely healthy situation. Flow control in Netty is like that when the 
server is pushing a lot of data (and the client cannot keep up).



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -919,6 +919,56 @@ The max allowed delay for delayed delivery (in 
milliseconds). If the broker rece
     )
     private int brokerMaxConnections = 0;
 
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "It relates to configuration \"WriteBufferHighWaterMark\" of 
Netty Channel Config. If the number of bytes"
+            + " queued in the write buffer exceeds this value, channel 
writable state will start to return \"false\"."
+    )
+    private int pulsarChannelWriteBufferHighWaterMark = 64 * 1024;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "It relates to configuration \"WriteBufferLowWaterMark\" of 
Netty Channel Config. If the number of bytes"
+                + " queued in the write buffer is smaller than this value, 
channel writable state will start to return"
+                + " \"true\"."
+    )
+    private int pulsarChannelWriteBufferLowWaterMark = 32 * 1024;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "If enabled, the broker will pause reading from the channel to 
deal with new request once the writer"
+            + " buffer is full, until it is changed to writable."
+    )
+    private boolean pulsarChannelPauseReceivingRequestsIfUnwritable = false;
+
+    @FieldContext(
+            category = CATEGORY_POLICIES,
+            doc = "After the connection is recovered from an pause receiving 
state, the channel will be rate-limited"
+                + " for a of time window to avoid overwhelming due to the 
backlog of requests. This parameter defines"
+                + " how long the rate limiting should last, in seconds. Once 
the bytes that are waiting to be sent out"
+                + " reach the \"pulsarChannelWriteBufferHighWaterMark\", the 
timer will be reset. Setting a negative"
+                + " value will disable the rate limiting."
+    )
+    private int pulsarChannelPauseReceivingCooldownSeconds = 5;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "After the connection is recovered from a pause receiving state, 
the channel will be rate-limited for a"
+            + " period of time to avoid overwhelming due to the backlog of 
requests. This parameter defines how"
+            + " many requests should be allowed in the rate limiting period."
+
+    )
+    private int pulsarChannelPauseReceivingCooldownLimitRate = 5;

Review Comment:
   Why is this called "*LimitRate" and the matching pair of this is 
"*RateLimitPeriod"?
   "*RateLimitPermits" and "*RateLimitPeriod" could possibly be more 
descriptive.



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -919,6 +919,56 @@ The max allowed delay for delayed delivery (in 
milliseconds). If the broker rece
     )
     private int brokerMaxConnections = 0;
 
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "It relates to configuration \"WriteBufferHighWaterMark\" of 
Netty Channel Config. If the number of bytes"
+            + " queued in the write buffer exceeds this value, channel 
writable state will start to return \"false\"."
+    )
+    private int pulsarChannelWriteBufferHighWaterMark = 64 * 1024;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "It relates to configuration \"WriteBufferLowWaterMark\" of 
Netty Channel Config. If the number of bytes"
+                + " queued in the write buffer is smaller than this value, 
channel writable state will start to return"
+                + " \"true\"."
+    )
+    private int pulsarChannelWriteBufferLowWaterMark = 32 * 1024;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "If enabled, the broker will pause reading from the channel to 
deal with new request once the writer"
+            + " buffer is full, until it is changed to writable."
+    )
+    private boolean pulsarChannelPauseReceivingRequestsIfUnwritable = false;
+
+    @FieldContext(
+            category = CATEGORY_POLICIES,
+            doc = "After the connection is recovered from an pause receiving 
state, the channel will be rate-limited"
+                + " for a of time window to avoid overwhelming due to the 
backlog of requests. This parameter defines"
+                + " how long the rate limiting should last, in seconds. Once 
the bytes that are waiting to be sent out"
+                + " reach the \"pulsarChannelWriteBufferHighWaterMark\", the 
timer will be reset. Setting a negative"
+                + " value will disable the rate limiting."
+    )
+    private int pulsarChannelPauseReceivingCooldownSeconds = 5;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "After the connection is recovered from a pause receiving state, 
the channel will be rate-limited for a"
+            + " period of time to avoid overwhelming due to the backlog of 
requests. This parameter defines how"
+            + " many requests should be allowed in the rate limiting period."
+
+    )
+    private int pulsarChannelPauseReceivingCooldownLimitRate = 5;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "After the connection is recovered from a pause receiving state, 
the channel will be rate-limited for a"
+            + " period of time to avoid overwhelming due to the backlog of 
requests. This parameter defines the"
+            + " period of the rate limiter in milliseconds."

Review Comment:
   "period of the rate limiter" remains a concept that is hard to understand. 
There should be a way to explain it. One possibility is to provide an example 
to clarify it. "If the rate limit period is set to 1000, then the unit is 
requests per second. When it's 10, the unit is requests per every 10ms"
   
   Cross referencing the "*RateLimitPermits" (I suggested a rename) config 
could also be useful since it's a "pair" for this config.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to