[
https://issues.apache.org/jira/browse/ROCKETMQ-106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16004614#comment-16004614
]
ASF GitHub Bot commented on ROCKETMQ-106:
-----------------------------------------
Github user shroman commented on a diff in the pull request:
https://github.com/apache/incubator-rocketmq/pull/66#discussion_r115726033
--- Diff:
client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
---
@@ -218,25 +219,49 @@ public void pullMessage(final PullRequest
pullRequest) {
return;
}
+ //flow control for topic
+ if (this.defaultMQPushConsumer.getPullThresholdForTopic() !=
Integer.MAX_VALUE) {
+ Map<MessageQueue, ProcessQueue> allProcessQMap =
this.getRebalanceImpl().getProcessQueueTable();
+ Iterator<Entry<MessageQueue, ProcessQueue>> it =
allProcessQMap.entrySet().iterator();
+ long sizeOfAllQueue = 0;
+ //pick the relative process queues and calculate size
+ while (it.hasNext()) {
+ Entry<MessageQueue, ProcessQueue> entry = it.next();
+ if
(pullRequest.getMessageQueue().getTopic().equals(entry.getKey().getTopic())) {
+ sizeOfAllQueue += entry.getValue().getMsgCount().get();
+ }
+ }
+ if (sizeOfAllQueue >
this.defaultMQPushConsumer.getPullThresholdForTopic()) {
+ this.executePullRequestLater(pullRequest,
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
+ if ((topicFlowControlTimes++ % 1000) == 0) {
+ log.warn(
+ "the consumer message topic buffer is full, so do
flow control, minOffset={}, maxOffset={}, sizeOfAllQueue={}, pullRequest={},
flowControlTimes={}",
--- End diff --
`so do flow control` is very ambiguous. Let's have a more concrete advice
on how to cope with the situation.
> Add flow control on topic level
> -------------------------------
>
> Key: ROCKETMQ-106
> URL: https://issues.apache.org/jira/browse/ROCKETMQ-106
> Project: Apache RocketMQ
> Issue Type: Wish
> Components: rocketmq-client
> Reporter: Jaskey Lam
> Assignee: Jaskey Lam
> Fix For: 4.1.0-incubating
>
>
> *Motivations*
> For current flow control, we can only control on queue level.
> Howerver, the numbers of queue allocated may be dynamic changed. For example,
> I might hope to control that at most 1000 messages can be pulled from broker
> to protect my client. And I have no idea how many queue I am allocated. Maybe
> I will have 5 queue and 5 instances so I set `pullThresholdForQueue`=1000,
> which works as expected when one is fine. But as long as any instances
> crashes, some instances may be allocated more than one queue, which will
> make messages pulled from broker exceed my expectations.
> A configuration of `pullThresholdForTopic` is propably most user hopes.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)