This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch 4.9.x
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/4.9.x by this push:
     new f26fe8675 [ISSUE #5983] Make consumer support flow control code better 
(#5984) (#6395)
f26fe8675 is described below

commit f26fe86751629aa047275288a58a966156820340
Author: rongtong <[email protected]>
AuthorDate: Mon Mar 20 09:53:01 2023 +0800

    [ISSUE #5983] Make consumer support flow control code better (#5984) (#6395)
    
    * When encountering the flow control code, pull it after 20ms instead of 3s
    
    * When encountering the flow control code, pull it after 20ms instead of 3s
    
    (cherry picked from commit a1d1cf8b3061dbb550f7a23b304f2694697a76ee)
    
    # Conflicts:
    #       
client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
    #       
client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
---
 .../impl/consumer/DefaultLitePullConsumerImpl.java | 23 +++++++++++++++-------
 .../impl/consumer/DefaultMQPushConsumerImpl.java   | 21 ++++++++++++++------
 .../rocketmq/common/protocol/ResponseCode.java     |  2 ++
 3 files changed, 33 insertions(+), 13 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 11cdbf6fb..8161e569a 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -65,6 +65,7 @@ import org.apache.rocketmq.common.help.FAQUrl;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.NamespaceUtil;
+import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
@@ -110,9 +111,13 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
      */
     private long pullTimeDelayMillsWhenException = 1000;
     /**
-     * Flow control interval
+     * Flow control interval when message cache is full
      */
-    private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
+    private static final long PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL = 
50;
+    /**
+     * Flow control interval when broker return flow control
+     */
+    private static final long PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL = 
20;
     /**
      * Delay some time when suspend pull service
      */
@@ -784,7 +789,7 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
                 }
 
                 if ((long) consumeRequestCache.size() * 
defaultLitePullConsumer.getPullBatchSize() > 
defaultLitePullConsumer.getPullThresholdForAll()) {
-                    scheduledThreadPoolExecutor.schedule(this, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+                    scheduledThreadPoolExecutor.schedule(this, 
PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
                     if ((consumeRequestFlowControlTimes++ % 1000) == 0) {
                         log.warn("The consume request count exceeds threshold 
{}, so do flow control, consume request count={}, flowControlTimes={}", 
consumeRequestCache.size(), consumeRequestFlowControlTimes);
                     }
@@ -795,7 +800,7 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
                 long cachedMessageSizeInMiB = processQueue.getMsgSize().get() 
/ (1024 * 1024);
 
                 if (cachedMessageCount > 
defaultLitePullConsumer.getPullThresholdForQueue()) {
-                    scheduledThreadPoolExecutor.schedule(this, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+                    scheduledThreadPoolExecutor.schedule(this, 
PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
                     if ((queueFlowControlTimes++ % 1000) == 0) {
                         log.warn(
                             "The cached message count exceeds the threshold 
{}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, 
flowControlTimes={}",
@@ -805,7 +810,7 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
                 }
 
                 if (cachedMessageSizeInMiB > 
defaultLitePullConsumer.getPullThresholdSizeForQueue()) {
-                    scheduledThreadPoolExecutor.schedule(this, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+                    scheduledThreadPoolExecutor.schedule(this, 
PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
                     if ((queueFlowControlTimes++ % 1000) == 0) {
                         log.warn(
                             "The cached message size exceeds the threshold {} 
MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, 
flowControlTimes={}",
@@ -815,7 +820,7 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
                 }
 
                 if (processQueue.getMaxSpan() > 
defaultLitePullConsumer.getConsumeMaxSpan()) {
-                    scheduledThreadPoolExecutor.schedule(this, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+                    scheduledThreadPoolExecutor.schedule(this, 
PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
                     if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                         log.warn(
                             "The queue's messages, span too long, so do flow 
control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}",
@@ -870,7 +875,11 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
                 } catch (InterruptedException interruptedException) {
                     log.warn("Polling thread was interrupted.", 
interruptedException);
                 } catch (Throwable e) {
-                    pullDelayTimeMills = pullTimeDelayMillsWhenException;
+                    if (e instanceof MQBrokerException && ((MQBrokerException) 
e).getResponseCode() == ResponseCode.FLOW_CONTROL) {
+                        pullDelayTimeMills = 
PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL;
+                    } else {
+                        pullDelayTimeMills = pullTimeDelayMillsWhenException;
+                    }
                     log.error("An error occurred in pull message process.", e);
                 }
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index df68284d4..bb40f855f 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -60,6 +60,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.filter.FilterAPI;
 import org.apache.rocketmq.common.help.FAQUrl;
 import org.apache.rocketmq.common.protocol.NamespaceUtil;
+import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageAccessor;
@@ -86,9 +87,13 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
      */
     private long pullTimeDelayMillsWhenException = 3000;
     /**
-     * Flow control interval
+     * Flow control interval when message cache is full
      */
-    private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
+    private static final long PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL = 
50;
+    /**
+     * Flow control interval when broker return flow control
+     */
+    private static final long PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL = 
20;
     /**
      * Delay some time when suspend pull service
      */
@@ -238,7 +243,7 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
         long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 
* 1024);
 
         if (cachedMessageCount > 
this.defaultMQPushConsumer.getPullThresholdForQueue()) {
-            this.executePullRequestLater(pullRequest, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
+            this.executePullRequestLater(pullRequest, 
PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);
             if ((queueFlowControlTimes++ % 1000) == 0) {
                 log.warn(
                     "the cached message count exceeds the threshold {}, so do 
flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, 
pullRequest={}, flowControlTimes={}",
@@ -248,7 +253,7 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
         }
 
         if (cachedMessageSizeInMiB > 
this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
-            this.executePullRequestLater(pullRequest, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
+            this.executePullRequestLater(pullRequest, 
PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);
             if ((queueFlowControlTimes++ % 1000) == 0) {
                 log.warn(
                     "the cached message size exceeds the threshold {} MiB, so 
do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, 
pullRequest={}, flowControlTimes={}",
@@ -259,7 +264,7 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
 
         if (!this.consumeOrderly) {
             if (processQueue.getMaxSpan() > 
this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
-                this.executePullRequestLater(pullRequest, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
+                this.executePullRequestLater(pullRequest, 
PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);
                 if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                     log.warn(
                         "the queue's messages, span too long, so do flow 
control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, 
flowControlTimes={}",
@@ -400,7 +405,11 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
                     log.warn("execute the pull request exception", e);
                 }
 
-                
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, 
pullTimeDelayMillsWhenException);
+                if (e instanceof MQBrokerException && ((MQBrokerException) 
e).getResponseCode() == ResponseCode.FLOW_CONTROL) {
+                    
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, 
PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL);
+                } else {
+                    
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, 
pullTimeDelayMillsWhenException);
+                }
             }
         };
 
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java 
b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
index dc744448f..f4adffdd5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
@@ -80,4 +80,6 @@ public class ResponseCode extends RemotingSysResponseCode {
 
     public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED = 211;
 
+    public static final int FLOW_CONTROL = 215;
+
 }

Reply via email to