lhotari commented on code in PR #24423: URL: https://github.com/apache/pulsar/pull/24423#discussion_r2373384002
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java: ########## @@ -438,11 +452,56 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { } } + private void checkPauseReceivingRequestsAfterResumeRateLimit(BaseCommand cmd) { + if (rateLimitingSecondsAfterResumeFromUnreadable <= 0 || cmd.getType() == BaseCommand.Type.PONG Review Comment: The later comment about "Please ensure that this is a no-op when this feature is disabled" hasn't been addressed yet. I would assume that it could be addressed by adding `!pulsarChannelPauseReceivingRequestsIfUnwritable ||` here. ```suggestion if (!pulsarChannelPauseReceivingRequestsIfUnwritable || rateLimitingSecondsAfterResumeFromUnreadable <= 0 || cmd.getType() == BaseCommand.Type.PONG ``` ########## pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java: ########## @@ -919,6 +919,56 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece ) private int brokerMaxConnections = 0; + @FieldContext( + category = CATEGORY_POLICIES, + doc = "It relates to configuration \"WriteBufferHighWaterMark\" of Netty Channel Config. If the number of bytes" + + " queued in the write buffer exceeds this value, channel writable state will start to return \"false\"." + ) + private int pulsarChannelWriteBufferHighWaterMark = 64 * 1024; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "It relates to configuration \"WriteBufferLowWaterMark\" of Netty Channel Config. If the number of bytes" + + " queued in the write buffer is smaller than this value, channel writable state will start to return" + + " \"true\"." + ) + private int pulsarChannelWriteBufferLowWaterMark = 32 * 1024; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "If enabled, the broker will pause reading from the channel to deal with new request once the writer" + + " buffer is full, until it is changed to writable." + ) + private boolean pulsarChannelPauseReceivingRequestsIfUnwritable = false; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "After the connection is recovered from an pause receiving state, the channel will be rate-limited" + + " for a of time window to avoid overwhelming due to the backlog of requests. This parameter defines" + + " how long the rate limiting should last, in seconds. Once the bytes that are waiting to be sent out" + + " reach the \"pulsarChannelWriteBufferHighWaterMark\", the timer will be reset. Setting a negative" + + " value will disable the rate limiting." + ) + private int pulsarChannelPauseReceivingCooldownSeconds = 5; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "After the connection is recovered from a pause receiving state, the channel will be rate-limited for a" + + " period of time to avoid overwhelming due to the backlog of requests. This parameter defines how" + + " many requests should be allowed in the rate limiting period." + + ) + private int pulsarChannelPauseReceivingCooldownLimitRate = 5; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "After the connection is recovered from a pause receiving state, the channel will be rate-limited for a" + + " period of time to avoid overwhelming due to the backlog of requests. This parameter defines the" Review Comment: "period of time" -> "period of time defined by pulsarChannelPauseReceivingCooldownSeconds ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java: ########## @@ -438,11 +450,37 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { } } + protected void checkRateLimit(BaseCommand cmd) { + if (cmd.getType() == BaseCommand.Type.PONG && cmd.getType() == BaseCommand.Type.PING) { + return; + } + if (requestRateLimiter.acquire(1) == 0 && !pausedDueToRateLimitation) { Review Comment: The comment about "Please ensure that this is a no-op when this feature is disabled" hasn't been addressed yet. I also added a new comment about 15 lines above. ########## pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java: ########## @@ -919,6 +919,56 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece ) private int brokerMaxConnections = 0; + @FieldContext( + category = CATEGORY_POLICIES, + doc = "It relates to configuration \"WriteBufferHighWaterMark\" of Netty Channel Config. If the number of bytes" + + " queued in the write buffer exceeds this value, channel writable state will start to return \"false\"." + ) + private int pulsarChannelWriteBufferHighWaterMark = 64 * 1024; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "It relates to configuration \"WriteBufferLowWaterMark\" of Netty Channel Config. If the number of bytes" + + " queued in the write buffer is smaller than this value, channel writable state will start to return" + + " \"true\"." + ) + private int pulsarChannelWriteBufferLowWaterMark = 32 * 1024; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "If enabled, the broker will pause reading from the channel to deal with new request once the writer" + + " buffer is full, until it is changed to writable." + ) + private boolean pulsarChannelPauseReceivingRequestsIfUnwritable = false; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "After the connection is recovered from an pause receiving state, the channel will be rate-limited" + + " for a of time window to avoid overwhelming due to the backlog of requests. This parameter defines" + + " how long the rate limiting should last, in seconds. Once the bytes that are waiting to be sent out" + + " reach the \"pulsarChannelWriteBufferHighWaterMark\", the timer will be reset. Setting a negative" + + " value will disable the rate limiting." + ) + private int pulsarChannelPauseReceivingCooldownSeconds = 5; Review Comment: I think I added a comment about the granularity of this setting, but I no longer see the previous comment due to changes. seconds might be too granular for tuning purposes. using milliseconds from the beginning would resolve that. ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java: ########## @@ -438,11 +452,56 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { } } + private void checkPauseReceivingRequestsAfterResumeRateLimit(BaseCommand cmd) { + if (rateLimitingSecondsAfterResumeFromUnreadable <= 0 || cmd.getType() == BaseCommand.Type.PONG + || cmd.getType() == BaseCommand.Type.PING) { + return; + } + if (log.isDebugEnabled()) { + final ChannelOutboundBuffer outboundBuffer = ctx.channel().unsafe().outboundBuffer(); + if (outboundBuffer != null) { + log.debug("Start to handle request [{}], totalPendingWriteBytes: {}, channel isWritable: {}", + cmd.getType(), outboundBuffer.totalPendingWriteBytes(), ctx.channel().isWritable()); + } else { + log.debug("Start to handle request [{}], channel isWritable: {}", + cmd.getType(), ctx.channel().isWritable()); + } + } + // "requestRateLimiter" will return the permits that you acquired if it is not opening(has been called + // "timingOpen(duration)"). + if (requestRateLimiter.acquire(1) == 0 && !pausedDueToRateLimitation) { + log.warn("[{}] Reached rate limitation", this); + // Stop receiving requests. + pausedDueToRateLimitation = true; + ctx.channel().config().setAutoRead(false); + // Resume after 1 second. + ctx.channel().eventLoop().schedule(() -> { + if (pausedDueToRateLimitation) { + log.info("[{}] Resuming connection after rate limitation", this); + ctx.channel().config().setAutoRead(true); + pausedDueToRateLimitation = false; + } + }, 1, TimeUnit.SECONDS); + } + } + @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - if (log.isDebugEnabled()) { - log.debug("Channel writability has changed to: {}", ctx.channel().isWritable()); + if (pauseReceivingRequestsIfUnwritable && ctx.channel().isWritable()) { + log.info("[{}] is writable, turn on channel auto-read", this); + ctx.channel().config().setAutoRead(true); + requestRateLimiter.timingOpen(rateLimitingSecondsAfterResumeFromUnreadable, TimeUnit.SECONDS); + } else if (pauseReceivingRequestsIfUnwritable && !ctx.channel().isWritable()) { + final ChannelOutboundBuffer outboundBuffer = ctx.channel().unsafe().outboundBuffer(); + if (outboundBuffer != null) { + log.warn("[{}] is not writable, turn off channel auto-read, totalPendingWriteBytes: {}", + this, outboundBuffer.totalPendingWriteBytes()); + } else { + log.warn("[{}] is not writable, turn off channel auto-read", this); + } Review Comment: I think that this will cause very verbose logging. Channel writability changes are completely normal in a Netty application. No warning should be printed when the writability changes since it's not an error. There will always be cases where the broker will be filling the outbound buffer with very high amounts until there's a different type of flow control for dispatching to consumers. The dispatcher doesn't currently care if the channel is writable or not and that's also why Pulsar is really fast at the moment. It will buffer data and it can utilize the network very efficiently because of this. The flow control is currently based on `managedLedgerMaxReadsInFlightSizeInMB`. The main weakness is that the solution doesn't have a concept of "fairness". A single slow consumer can hog the resources even when it's not making progress. That's why writability should also be used in dispatching, but the high water mark / low water marks should be relatively high so that sufficient buffering is happening. Since water marks could be adjusted separately for each channel instance, it should possibly be later some sort of policy setting so that client connections that require high throughput could be configured with higher buffering. This is why I think that adding PIP-434 is useful and we will learn the impacts of it along the way when we start experimenting how it behaves in practice. At this moment, I don't believe that the cooldown period is useful since channel writability can be switching back and forth constantly in a completely healthy situation. Flow control in Netty is like that when the server is pushing a lot of data (and the client cannot keep up). ########## pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java: ########## @@ -919,6 +919,56 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece ) private int brokerMaxConnections = 0; + @FieldContext( + category = CATEGORY_POLICIES, + doc = "It relates to configuration \"WriteBufferHighWaterMark\" of Netty Channel Config. If the number of bytes" + + " queued in the write buffer exceeds this value, channel writable state will start to return \"false\"." + ) + private int pulsarChannelWriteBufferHighWaterMark = 64 * 1024; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "It relates to configuration \"WriteBufferLowWaterMark\" of Netty Channel Config. If the number of bytes" + + " queued in the write buffer is smaller than this value, channel writable state will start to return" + + " \"true\"." + ) + private int pulsarChannelWriteBufferLowWaterMark = 32 * 1024; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "If enabled, the broker will pause reading from the channel to deal with new request once the writer" + + " buffer is full, until it is changed to writable." + ) + private boolean pulsarChannelPauseReceivingRequestsIfUnwritable = false; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "After the connection is recovered from an pause receiving state, the channel will be rate-limited" + + " for a of time window to avoid overwhelming due to the backlog of requests. This parameter defines" + + " how long the rate limiting should last, in seconds. Once the bytes that are waiting to be sent out" + + " reach the \"pulsarChannelWriteBufferHighWaterMark\", the timer will be reset. Setting a negative" + + " value will disable the rate limiting." + ) + private int pulsarChannelPauseReceivingCooldownSeconds = 5; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "After the connection is recovered from a pause receiving state, the channel will be rate-limited for a" + + " period of time to avoid overwhelming due to the backlog of requests. This parameter defines how" + + " many requests should be allowed in the rate limiting period." + + ) + private int pulsarChannelPauseReceivingCooldownLimitRate = 5; Review Comment: Why is this called "*LimitRate" and the matching pair of this is "*RateLimitPeriod"? "*RateLimitPermits" and "*RateLimitPeriod" could possibly be more descriptive. ########## pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java: ########## @@ -919,6 +919,56 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece ) private int brokerMaxConnections = 0; + @FieldContext( + category = CATEGORY_POLICIES, + doc = "It relates to configuration \"WriteBufferHighWaterMark\" of Netty Channel Config. If the number of bytes" + + " queued in the write buffer exceeds this value, channel writable state will start to return \"false\"." + ) + private int pulsarChannelWriteBufferHighWaterMark = 64 * 1024; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "It relates to configuration \"WriteBufferLowWaterMark\" of Netty Channel Config. If the number of bytes" + + " queued in the write buffer is smaller than this value, channel writable state will start to return" + + " \"true\"." + ) + private int pulsarChannelWriteBufferLowWaterMark = 32 * 1024; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "If enabled, the broker will pause reading from the channel to deal with new request once the writer" + + " buffer is full, until it is changed to writable." + ) + private boolean pulsarChannelPauseReceivingRequestsIfUnwritable = false; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "After the connection is recovered from an pause receiving state, the channel will be rate-limited" + + " for a of time window to avoid overwhelming due to the backlog of requests. This parameter defines" + + " how long the rate limiting should last, in seconds. Once the bytes that are waiting to be sent out" + + " reach the \"pulsarChannelWriteBufferHighWaterMark\", the timer will be reset. Setting a negative" + + " value will disable the rate limiting." + ) + private int pulsarChannelPauseReceivingCooldownSeconds = 5; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "After the connection is recovered from a pause receiving state, the channel will be rate-limited for a" + + " period of time to avoid overwhelming due to the backlog of requests. This parameter defines how" + + " many requests should be allowed in the rate limiting period." + + ) + private int pulsarChannelPauseReceivingCooldownLimitRate = 5; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "After the connection is recovered from a pause receiving state, the channel will be rate-limited for a" + + " period of time to avoid overwhelming due to the backlog of requests. This parameter defines the" + + " period of the rate limiter in milliseconds." Review Comment: "period of the rate limiter" remains a concept that is hard to understand. There should be a way to explain it. One possibility is to provide an example to clarify it. "If the rate limit period is set to 1000, then the unit is requests per second. When it's 10, the unit is requests per every 10ms" Cross referencing the "*RateLimitPermits" (I suggested a rename) config could also be useful since it's a "pair" for this config. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org