[
https://issues.apache.org/jira/browse/ROCKETMQ-110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
dongeforever updated ROCKETMQ-110:
----------------------------------
Fix Version/s: 4.1.0-incubating
> consumeConcurrentlyMaxSpan has a very limited role
> --------------------------------------------------
>
> Key: ROCKETMQ-110
> URL: https://issues.apache.org/jira/browse/ROCKETMQ-110
> Project: Apache RocketMQ
> Issue Type: Improvement
> Components: rocketmq-client
> Affects Versions: 4.0.0-incubating
> Reporter: Jaskey Lam
> Assignee: Xiaorui Wang
> Fix For: 4.1.0-incubating
>
>
> Actually, in the current implementation of consumeConcurrentlyMaxSpan , the
> role it plays is very limited.
> In my opinion, RocketMQ hopes to solve a problem that when some messages are
> blocked( may be possibly considered as dead lock or dead loop),say message
> with offset 100 is blocked, and the rest whose offset are 101 to 2100 are
> very healthy, but in the case that if the process is killed (since it can
> not be shutdown normally if dead lock or dead loop is really happens when
> consuming), the later messages 101 to 2100 which is consumed sucessfully will
> be repeated to consume again since the consumer offset will still remains at
> 100.
> So to reduce the influence of repeated message numbers, flow control should
> be taken.
> But the current implementaion is to compare the span of last message of the
> first message to consumeConcurrentlyMaxSpan.
> In the above example, the span is 2000 and flow control may do action to make
> it pause for one cycle for 50ms, but next time when the message 2100 and the
> rest of healthy message consumed successfully , the fisrt key and the last
> key will be the same, 110, and the max span will be considered as 0, pull
> operation will continue.
> {code}
> public long getMaxSpan() {
> try {
> this.lockTreeMap.readLock().lockInterruptibly();
> try {
> if (!this.msgTreeMap.isEmpty()) {
> return this.msgTreeMap.lastKey() -
> this.msgTreeMap.firstKey();
> }
> } finally {
> this.lockTreeMap.readLock().unlock();
> }
> } catch (InterruptedException e) {
> log.error("getMaxSpan exception", e);
> }
> return 0;
> }
> {code}
> So for single message problem, consumeConcurrentlyMaxSpan will help nothing
> but in my opion this is what flow control should also takes into
> considerations.
> I suggest maxSpan should be lastConsumedOffset(does not record now) -
> firstConsumingOffset(the first key of msgTreeMap).
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)