This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch 4.9.x
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/4.9.x by this push:
new f26fe8675 [ISSUE #5983] Make consumer support flow control code better
(#5984) (#6395)
f26fe8675 is described below
commit f26fe86751629aa047275288a58a966156820340
Author: rongtong <[email protected]>
AuthorDate: Mon Mar 20 09:53:01 2023 +0800
[ISSUE #5983] Make consumer support flow control code better (#5984) (#6395)
* When encountering the flow control code, pull it after 20ms instead of 3s
* When encountering the flow control code, pull it after 20ms instead of 3s
(cherry picked from commit a1d1cf8b3061dbb550f7a23b304f2694697a76ee)
# Conflicts:
#
client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
#
client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
---
.../impl/consumer/DefaultLitePullConsumerImpl.java | 23 +++++++++++++++-------
.../impl/consumer/DefaultMQPushConsumerImpl.java | 21 ++++++++++++++------
.../rocketmq/common/protocol/ResponseCode.java | 2 ++
3 files changed, 33 insertions(+), 13 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 11cdbf6fb..8161e569a 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -65,6 +65,7 @@ import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
+import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
@@ -110,9 +111,13 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
*/
private long pullTimeDelayMillsWhenException = 1000;
/**
- * Flow control interval
+ * Flow control interval when message cache is full
*/
- private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
+ private static final long PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL =
50;
+ /**
+ * Flow control interval when broker return flow control
+ */
+ private static final long PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL =
20;
/**
* Delay some time when suspend pull service
*/
@@ -784,7 +789,7 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
}
if ((long) consumeRequestCache.size() *
defaultLitePullConsumer.getPullBatchSize() >
defaultLitePullConsumer.getPullThresholdForAll()) {
- scheduledThreadPoolExecutor.schedule(this,
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+ scheduledThreadPoolExecutor.schedule(this,
PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((consumeRequestFlowControlTimes++ % 1000) == 0) {
log.warn("The consume request count exceeds threshold
{}, so do flow control, consume request count={}, flowControlTimes={}",
consumeRequestCache.size(), consumeRequestFlowControlTimes);
}
@@ -795,7 +800,7 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
long cachedMessageSizeInMiB = processQueue.getMsgSize().get()
/ (1024 * 1024);
if (cachedMessageCount >
defaultLitePullConsumer.getPullThresholdForQueue()) {
- scheduledThreadPoolExecutor.schedule(this,
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+ scheduledThreadPoolExecutor.schedule(this,
PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"The cached message count exceeds the threshold
{}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB,
flowControlTimes={}",
@@ -805,7 +810,7 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
}
if (cachedMessageSizeInMiB >
defaultLitePullConsumer.getPullThresholdSizeForQueue()) {
- scheduledThreadPoolExecutor.schedule(this,
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+ scheduledThreadPoolExecutor.schedule(this,
PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"The cached message size exceeds the threshold {}
MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB,
flowControlTimes={}",
@@ -815,7 +820,7 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
}
if (processQueue.getMaxSpan() >
defaultLitePullConsumer.getConsumeMaxSpan()) {
- scheduledThreadPoolExecutor.schedule(this,
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+ scheduledThreadPoolExecutor.schedule(this,
PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"The queue's messages, span too long, so do flow
control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}",
@@ -870,7 +875,11 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
} catch (InterruptedException interruptedException) {
log.warn("Polling thread was interrupted.",
interruptedException);
} catch (Throwable e) {
- pullDelayTimeMills = pullTimeDelayMillsWhenException;
+ if (e instanceof MQBrokerException && ((MQBrokerException)
e).getResponseCode() == ResponseCode.FLOW_CONTROL) {
+ pullDelayTimeMills =
PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL;
+ } else {
+ pullDelayTimeMills = pullTimeDelayMillsWhenException;
+ }
log.error("An error occurred in pull message process.", e);
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index df68284d4..bb40f855f 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -60,6 +60,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
+import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
@@ -86,9 +87,13 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
*/
private long pullTimeDelayMillsWhenException = 3000;
/**
- * Flow control interval
+ * Flow control interval when message cache is full
*/
- private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
+ private static final long PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL =
50;
+ /**
+ * Flow control interval when broker return flow control
+ */
+ private static final long PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL =
20;
/**
* Delay some time when suspend pull service
*/
@@ -238,7 +243,7 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024
* 1024);
if (cachedMessageCount >
this.defaultMQPushConsumer.getPullThresholdForQueue()) {
- this.executePullRequestLater(pullRequest,
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
+ this.executePullRequestLater(pullRequest,
PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do
flow control, minOffset={}, maxOffset={}, count={}, size={} MiB,
pullRequest={}, flowControlTimes={}",
@@ -248,7 +253,7 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
}
if (cachedMessageSizeInMiB >
this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
- this.executePullRequestLater(pullRequest,
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
+ this.executePullRequestLater(pullRequest,
PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so
do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB,
pullRequest={}, flowControlTimes={}",
@@ -259,7 +264,7 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() >
this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
- this.executePullRequestLater(pullRequest,
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
+ this.executePullRequestLater(pullRequest,
PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow
control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={},
flowControlTimes={}",
@@ -400,7 +405,11 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
log.warn("execute the pull request exception", e);
}
-
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
pullTimeDelayMillsWhenException);
+ if (e instanceof MQBrokerException && ((MQBrokerException)
e).getResponseCode() == ResponseCode.FLOW_CONTROL) {
+
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL);
+ } else {
+
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
pullTimeDelayMillsWhenException);
+ }
}
};
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
index dc744448f..f4adffdd5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
@@ -80,4 +80,6 @@ public class ResponseCode extends RemotingSysResponseCode {
public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED = 211;
+ public static final int FLOW_CONTROL = 215;
+
}