[ 
https://issues.apache.org/jira/browse/ROCKETMQ-110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jaskey Lam updated ROCKETMQ-110:
--------------------------------
    Description: 
Actually, in the current implementation of  consumeConcurrentlyMaxSpan , the 
role it plays is very limited.

In my opinion, RocketMQ hopes to solve a problem that when some messages are 
blocked( may be possibly considered as dead lock or dead loop),say message with 
offset 100 is blocked,  and the rest whose offset are 101 to 2100 are very 
healthy, in the case that if the process is killed since it can not be shutdown 
normally if dead lock or dead loop is really happens when consuming, the later 
messages 101 to 2100 which is consumed sucessfully will be repeated to consume 
again since the consumer offset will still remains at 100.

So to reduce the influence of repeated message numbers, flow control should be 
taken.  

But the current implementaion is to compare the span of last message of the 
first message to consumeConcurrentlyMaxSpan.

In the above example, the span is 2000 and flow control may do action to make 
it pause for one cycle for 50ms, but next time when the message 2100 and the 
rest of healthy message  consumed successfully , the fisrt key and the last key 
will be the same, 110, and the max span will be considered as 0, pull operation 
will continue.

{code}
    public long getMaxSpan() {
        try {
            this.lockTreeMap.readLock().lockInterruptibly();
            try {
                if (!this.msgTreeMap.isEmpty()) {
                    return this.msgTreeMap.lastKey() - 
this.msgTreeMap.firstKey();
                }
            } finally {
                this.lockTreeMap.readLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("getMaxSpan exception", e);
        }

        return 0;
    }
{code}

So for single message problem, consumeConcurrentlyMaxSpan will help nothing but 
in my opion this is what flow control should also takes into considerations.

I suggest maxSpan should be lastConsumedOffset(does not record now) - 
firstConsumingOffset(the first key of msgTreeMap).


  was:
Actually, in the current implementation of  consumeConcurrentlyMaxSpan , the 
role it plays is very limited.

In my opinion, RocketMQ hopes to solve a problem that when some messages are 
blocked( may be possibly considered as dead lock or dead loop),say message with 
offset 100 is blocked,  and the rest whose offset is 101 to 2100 are very 
healthy, in the case that if the process is killed since it can not be shutdown 
normally if dead lock or dead loop is really happens when consuming, the later 
messages 101 to 2100 which is consumed sucessfully will be repeated to consume 
again since the consumer offset will still remains at 100.

So to reduce the influence of repeated message numbers, flow control should be 
taken.  

But the current implementaion is to compare the span of last message of the 
first message to consumeConcurrentlyMaxSpan.

In the above example, the span is 2000 and flow control may do action to make 
it pause for one cycle for 50ms, but next time when the message 2100 and the 
rest of healthy message  consumed successfully , the fisrt key and the last key 
will be the same, 110, and the max span will be considered as 0, pull operation 
will continue.

{code}
    public long getMaxSpan() {
        try {
            this.lockTreeMap.readLock().lockInterruptibly();
            try {
                if (!this.msgTreeMap.isEmpty()) {
                    return this.msgTreeMap.lastKey() - 
this.msgTreeMap.firstKey();
                }
            } finally {
                this.lockTreeMap.readLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("getMaxSpan exception", e);
        }

        return 0;
    }
{code}

So for single message problem, consumeConcurrentlyMaxSpan will help nothing but 
in my opion this is what flow control should also takes into considerations.

I suggest maxSpan should be lastConsumedOffset(does not record now) - 
firstConsumingOffset(the first key of msgTreeMap).



> consumeConcurrentlyMaxSpan has a very limited role
> --------------------------------------------------
>
>                 Key: ROCKETMQ-110
>                 URL: https://issues.apache.org/jira/browse/ROCKETMQ-110
>             Project: Apache RocketMQ
>          Issue Type: Improvement
>          Components: rocketmq-client
>    Affects Versions: 4.0.0-incubating
>            Reporter: Jaskey Lam
>            Assignee: Xiaorui Wang
>
> Actually, in the current implementation of  consumeConcurrentlyMaxSpan , the 
> role it plays is very limited.
> In my opinion, RocketMQ hopes to solve a problem that when some messages are 
> blocked( may be possibly considered as dead lock or dead loop),say message 
> with offset 100 is blocked,  and the rest whose offset are 101 to 2100 are 
> very healthy, in the case that if the process is killed since it can not be 
> shutdown normally if dead lock or dead loop is really happens when consuming, 
> the later messages 101 to 2100 which is consumed sucessfully will be repeated 
> to consume again since the consumer offset will still remains at 100.
> So to reduce the influence of repeated message numbers, flow control should 
> be taken.  
> But the current implementaion is to compare the span of last message of the 
> first message to consumeConcurrentlyMaxSpan.
> In the above example, the span is 2000 and flow control may do action to make 
> it pause for one cycle for 50ms, but next time when the message 2100 and the 
> rest of healthy message  consumed successfully , the fisrt key and the last 
> key will be the same, 110, and the max span will be considered as 0, pull 
> operation will continue.
> {code}
>     public long getMaxSpan() {
>         try {
>             this.lockTreeMap.readLock().lockInterruptibly();
>             try {
>                 if (!this.msgTreeMap.isEmpty()) {
>                     return this.msgTreeMap.lastKey() - 
> this.msgTreeMap.firstKey();
>                 }
>             } finally {
>                 this.lockTreeMap.readLock().unlock();
>             }
>         } catch (InterruptedException e) {
>             log.error("getMaxSpan exception", e);
>         }
>         return 0;
>     }
> {code}
> So for single message problem, consumeConcurrentlyMaxSpan will help nothing 
> but in my opion this is what flow control should also takes into 
> considerations.
> I suggest maxSpan should be lastConsumedOffset(does not record now) - 
> firstConsumingOffset(the first key of msgTreeMap).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to