Jaskey Lam created ROCKETMQ-110:
-----------------------------------

             Summary: consumeConcurrentlyMaxSpan has a very limited role
                 Key: ROCKETMQ-110
                 URL: https://issues.apache.org/jira/browse/ROCKETMQ-110
             Project: Apache RocketMQ
          Issue Type: Improvement
          Components: rocketmq-client
    Affects Versions: 4.0.0-incubating
            Reporter: Jaskey Lam
            Assignee: Xiaorui Wang


Actually, in the current implementation of  consumeConcurrentlyMaxSpan , the 
role it plays is very limited.

In my opinion, RocketMQ hopes to solve a problem that if some messages blocks( 
may be possibly considered as dead lock or dead loop),say message with offset 
100 blocked,  and the rest whose offset is 101 to 2100 are very healthy, in the 
case that if the process is killed since it can not be shutdown normally if 
dead lock or dead loop is really happens when consuming, the later messages 101 
to 2100 which is consumed sucessfully will be repeated to consume again since 
the consumer offset will still remains at 100.

So to reduce the influence of repeated message numbers, flow control should be 
taken.  

But the current implementaion is to compare the span of last message of the 
first message to consumeConcurrentlyMaxSpan.

In the above example, the span is 2000 and flow control may do action to make 
it pause for one cycle for 50ms, but next time when the message 2100 and the 
rest of healthy message  consumed successfully , the fisrt key and the last key 
will be the same, 110, and the max span will be considered as 0, pull operation 
will continue.


    public long getMaxSpan() {
        try {
            this.lockTreeMap.readLock().lockInterruptibly();
            try {
                if (!this.msgTreeMap.isEmpty()) {
                    return this.msgTreeMap.lastKey() - 
this.msgTreeMap.firstKey();
                }
            } finally {
                this.lockTreeMap.readLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("getMaxSpan exception", e);
        }

        return 0;
    }

So for single message problem, consumeConcurrentlyMaxSpan will help nothing but 
in my opion this is what flow control should also takes into considerations.

I suggest maxSpan should be lastConsumedOffset - firstConsumingOffset(the first 
key of msgTreeMap).




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to