This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new e166f75  Disable channel auto read when publish rate or publish buffer 
exceeded (#6550)
e166f75 is described below

commit e166f75ea7dfcaf1170d89955fe2011a3c8dd9c4
Author: lipenghui <[email protected]>
AuthorDate: Thu Mar 19 11:55:16 2020 +0800

    Disable channel auto read when publish rate or publish buffer exceeded 
(#6550)
    
    Disable channel auto-read when publishing rate or publish buffer exceeded. 
Currently, ServerCnx set channel auto-read to false when getting a new message 
and publish rate exceeded or publish buffer exceeded. So, it depends on reading 
more one message. If there are too many ServerCnx(too many topics or clients), 
this will result in publish rate limitations with a large deviation. Here is an 
example to show the problem.
    
    Enable publish rate limit in broker.conf
    ```
    brokerPublisherThrottlingTickTimeMillis=1
    brokerPublisherThrottlingMaxByteRate=10000000
    ```
    
    Use Pulsar perf to test 100 partition message publishing:
    ```
    bin/pulsar-perf produce -s 500000 -r 100000 -t 1 100p
    ```
    
    The test result:
    ```
    10:45:28.844 [main] INFO  org.apache.pulsar.testclient.PerformanceProducer 
- Throughput produced:    367.8  msg/s ---   1402.9 Mbit/s --- failure      0.0 
msg/s --- Latency: mean: 710.008 ms - med: 256.969 - 95pct: 2461.439 - 99pct: 
3460.255 - 99.9pct: 4755.007 - 99.99pct: 4755.007 - Max: 4755.007
    10:45:38.919 [main] INFO  org.apache.pulsar.testclient.PerformanceProducer 
- Throughput produced:    456.6  msg/s ---   1741.9 Mbit/s --- failure      0.0 
msg/s --- Latency: mean: 2551.341 ms - med: 2347.599 - 95pct: 6852.639 - 99pct: 
9630.015 - 99.9pct: 10824.319 - 99.99pct: 10824.319 - Max: 10824.319
    10:45:48.959 [main] INFO  org.apache.pulsar.testclient.PerformanceProducer 
- Throughput produced:    432.0  msg/s ---   1648.0 Mbit/s --- failure      0.0 
msg/s --- Latency: mean: 4373.505 ms - med: 3972.047 - 95pct: 11754.687 - 
99pct: 15713.663 - 99.9pct: 17638.527 - 99.99pct: 17705.727 - Max: 17705.727
    10:45:58.996 [main] INFO  org.apache.pulsar.testclient.PerformanceProducer 
- Throughput produced:    430.6  msg/s ---   1642.6 Mbit/s --- failure      0.0 
msg/s --- Latency: mean: 5993.563 ms - med: 4291.071 - 95pct: 18022.527 - 
99pct: 21649.663 - 99.9pct: 24885.375 - 99.99pct: 25335.551 - Max: 25335.551
    10:46:09.195 [main] INFO  org.apache.pulsar.testclient.PerformanceProducer 
- Throughput produced:    403.2  msg/s ---   1538.3 Mbit/s --- failure      0.0 
msg/s --- Latency: mean: 7883.304 ms - med: 6184.159 - 95pct: 23625.343 - 
99pct: 29524.991 - 99.9pct: 30813.823 - 99.99pct: 31467.775 - Max: 31467.775
    10:46:19.314 [main] INFO  org.apache.pulsar.testclient.PerformanceProducer 
- Throughput produced:    401.1  msg/s ---   1530.1 Mbit/s --- failure      0.0 
msg/s --- Latency: mean: 9587.407 ms - med: 6907.007 - 95pct: 28524.927 - 
99pct: 34815.999 - 99.9pct: 36759.551 - 99.99pct: 37581.567 - Max: 37581.567
    10:46:29.389 [main] INFO  org.apache.pulsar.testclient.PerformanceProducer 
- Throughput produced:    372.8  msg/s ---   1422.0 Mbit/s --- failure      0.0 
msg/s --- Latency: mean: 11984.595 ms - med: 10095.231 - 95pct: 34515.967 - 
99pct: 40754.175 - 99.9pct: 43553.535 - 99.99pct: 43603.199 - Max: 43603.199
    10:46:39.459 [main] INFO  org.apache.pulsar.testclient.PerformanceProducer 
- Throughput produced:    374.6  msg/s ---   1429.1 Mbit/s --- failure      0.0 
msg/s --- Latency: mean: 12208.459 ms - med: 7807.455 - 95pct: 38799.871 - 
99pct: 46936.575 - 99.9pct: 50500.095 - 99.99pct: 50500.095 - Max: 50500.095
    10:46:49.537 [main] INFO  org.apache.pulsar.testclient.PerformanceProducer 
- Throughput produced:    295.6  msg/s ---   1127.5 Mbit/s --- failure      0.0 
msg/s --- Latency: mean: 14503.565 ms - med: 10753.087 - 95pct: 45041.407 - 
99pct: 54307.327 - 99.9pct: 57786.623 - 99.99pct: 57786.623 - Max: 57786.623
    ```
    
    Analyze the reasons for such a large deviation is the producer sent batch 
messages and ServerCnx read more one message.
    
    This PR can not completely solve the problem but can alleviate this 
problem. When the message publish rate exceeded, the broker set channel 
auto-read to false for all topics. This will avoid parts of ServerCnx read more 
one message.
    
    *If `yes` was chosen, please highlight the changes*
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API: (no)
      - The schema: (no)
      - The default values of configurations: (no)
      - The wire protocol: (no)
      - The rest endpoints: (no)
      - The admin cli options: (no)
      - Anything that affects deployment: (no)
    
      - Does this pull request introduce a new feature? (no)
    
    (cherry picked from commit ec31d549e7aacf8b8fe54be650adc3b36e04446c)
---
 .../org/apache/pulsar/broker/service/AbstractTopic.java    |  6 ++++++
 .../org/apache/pulsar/broker/service/BrokerService.java    |  3 +++
 .../java/org/apache/pulsar/broker/service/ServerCnx.java   | 14 ++++++++++----
 3 files changed, 19 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 638c8c7..42d30d9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -296,6 +296,12 @@ public abstract class AbstractTopic implements Topic {
         }
     }
 
+    protected void disableProducerRead() {
+        if (producers != null) {
+            producers.values().forEach(producer -> 
producer.getCnx().disableCnxAutoRead());
+        }
+    }
+
     protected void checkTopicFenced() throws BrokerServiceException {
         if (isFenced) {
             log.warn("[{}] Attempting to add producer to a fenced topic", 
topic);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index eb8a05f..be51fc9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1129,6 +1129,9 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
 
     public void checkBrokerPublishThrottlingRate() {
         brokerPublishRateLimiter.checkPublishRate();
+        if (brokerPublishRateLimiter.isPublishRateExceeded()) {
+            forEachTopic(topic -> ((AbstractTopic) 
topic).disableProducerRead());
+        }
     }
 
     private void refreshBrokerPublishRate() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 2a78508..89bf17c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -25,6 +25,7 @@ import static 
org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync;
 import static 
org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
 import static org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 
 import io.netty.buffer.ByteBuf;
@@ -143,7 +144,7 @@ public class ServerCnx extends PulsarHandler {
     private final boolean schemaValidationEnforced;
     private String authMethod = "none";
     private final int maxMessageSize;
-    
+
     // Flag to manage throttling-rate by atomically enable/disable 
read-channel.
     private volatile boolean autoReadDisabledRateLimiting = false;
 
@@ -1608,10 +1609,15 @@ public class ServerCnx extends PulsarHandler {
             ctx.channel().config().setAutoRead(true);
             // triggers channel read
             ctx.read();
-            autoReadDisabledRateLimiting = false;
         }
     }
-    
+
+    void disableCnxAutoRead() {
+        if (ctx.channel().config().isAutoRead() ) {
+            ctx.channel().config().setAutoRead(false);
+        }
+    }
+
     private <T> ServerError getErrorCode(CompletableFuture<T> future) {
         ServerError error = ServerError.UnknownError;
         try {
@@ -1695,4 +1701,4 @@ public class ServerCnx extends PulsarHandler {
     public String getClientVersion() {
         return clientVersion;
     }
-}
\ No newline at end of file
+}

Reply via email to