[ 
https://issues.apache.org/jira/browse/ROCKETMQ-110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongeforever updated ROCKETMQ-110:
----------------------------------
    Fix Version/s: 4.1.0-incubating

> consumeConcurrentlyMaxSpan has a very limited role
> --------------------------------------------------
>
>                 Key: ROCKETMQ-110
>                 URL: https://issues.apache.org/jira/browse/ROCKETMQ-110
>             Project: Apache RocketMQ
>          Issue Type: Improvement
>          Components: rocketmq-client
>    Affects Versions: 4.0.0-incubating
>            Reporter: Jaskey Lam
>            Assignee: Xiaorui Wang
>             Fix For: 4.1.0-incubating
>
>
> Actually, in the current implementation of  consumeConcurrentlyMaxSpan , the 
> role it plays is very limited.
> In my opinion, RocketMQ hopes to solve a problem that when some messages are 
> blocked( may be possibly considered as dead lock or dead loop),say message 
> with offset 100 is blocked,  and the rest whose offset are 101 to 2100 are 
> very healthy, but  in the case that if the process is killed (since it can 
> not be shutdown normally if dead lock or dead loop is really happens when 
> consuming), the later messages 101 to 2100 which is consumed sucessfully will 
> be repeated to consume again since the consumer offset will still remains at 
> 100.
> So to reduce the influence of repeated message numbers, flow control should 
> be taken.  
> But the current implementaion is to compare the span of last message of the 
> first message to consumeConcurrentlyMaxSpan.
> In the above example, the span is 2000 and flow control may do action to make 
> it pause for one cycle for 50ms, but next time when the message 2100 and the 
> rest of healthy message  consumed successfully , the fisrt key and the last 
> key will be the same, 110, and the max span will be considered as 0, pull 
> operation will continue.
> {code}
>     public long getMaxSpan() {
>         try {
>             this.lockTreeMap.readLock().lockInterruptibly();
>             try {
>                 if (!this.msgTreeMap.isEmpty()) {
>                     return this.msgTreeMap.lastKey() - 
> this.msgTreeMap.firstKey();
>                 }
>             } finally {
>                 this.lockTreeMap.readLock().unlock();
>             }
>         } catch (InterruptedException e) {
>             log.error("getMaxSpan exception", e);
>         }
>         return 0;
>     }
> {code}
> So for single message problem, consumeConcurrentlyMaxSpan will help nothing 
> but in my opion this is what flow control should also takes into 
> considerations.
> I suggest maxSpan should be lastConsumedOffset(does not record now) - 
> firstConsumingOffset(the first key of msgTreeMap).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to