[ 
https://issues.apache.org/jira/browse/ROCKETMQ-106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16004614#comment-16004614
 ] 

ASF GitHub Bot commented on ROCKETMQ-106:
-----------------------------------------

Github user shroman commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/66#discussion_r115726033
  
    --- Diff: 
client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
 ---
    @@ -218,25 +219,49 @@ public void pullMessage(final PullRequest 
pullRequest) {
                 return;
             }
     
    +        //flow control for topic
    +        if (this.defaultMQPushConsumer.getPullThresholdForTopic() != 
Integer.MAX_VALUE) {
    +            Map<MessageQueue, ProcessQueue> allProcessQMap = 
this.getRebalanceImpl().getProcessQueueTable();
    +            Iterator<Entry<MessageQueue, ProcessQueue>> it = 
allProcessQMap.entrySet().iterator();
    +            long sizeOfAllQueue = 0;
    +            //pick the relative process queues and calculate size
    +            while (it.hasNext()) {
    +                Entry<MessageQueue, ProcessQueue> entry = it.next();
    +                if 
(pullRequest.getMessageQueue().getTopic().equals(entry.getKey().getTopic())) {
    +                    sizeOfAllQueue += entry.getValue().getMsgCount().get();
    +                }
    +            }
    +            if (sizeOfAllQueue > 
this.defaultMQPushConsumer.getPullThresholdForTopic()) {
    +                this.executePullRequestLater(pullRequest, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    +                if ((topicFlowControlTimes++ % 1000) == 0) {
    +                    log.warn(
    +                        "the consumer message topic buffer is full, so do 
flow control, minOffset={}, maxOffset={}, sizeOfAllQueue={}, pullRequest={}, 
flowControlTimes={}",
    --- End diff --
    
    `so do flow control` is very ambiguous. Let's have a more concrete advice 
on how to cope with the situation.


> Add flow control on topic level
> -------------------------------
>
>                 Key: ROCKETMQ-106
>                 URL: https://issues.apache.org/jira/browse/ROCKETMQ-106
>             Project: Apache RocketMQ
>          Issue Type: Wish
>          Components: rocketmq-client
>            Reporter: Jaskey Lam
>            Assignee: Jaskey Lam
>             Fix For: 4.1.0-incubating
>
>
> *Motivations*
> For current flow control, we can only control on queue level. 
> Howerver, the numbers of queue allocated may be dynamic changed. For example, 
> I might hope to control that at most 1000 messages can be pulled from broker 
> to protect my client. And I have no idea how many queue I am allocated. Maybe 
> I will have 5 queue and 5 instances so I set `pullThresholdForQueue`=1000, 
> which works as expected when one is fine. But as long as any instances 
> crashes, some instances may be allocated  more than one queue, which will 
> make messages pulled from broker exceed my expectations.
> A configuration of  `pullThresholdForTopic` is propably most user hopes.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to