[
https://issues.apache.org/jira/browse/ROCKETMQ-110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jaskey Lam updated ROCKETMQ-110:
--------------------------------
Description:
Actually, in the current implementation of consumeConcurrentlyMaxSpan , the
role it plays is very limited.
In my opinion, RocketMQ hopes to solve a problem that when some messages are
blocked( may be possibly considered as dead lock or dead loop),say message with
offset 100 is blocked, and the rest whose offset are 101 to 2100 are very
healthy, but in the case that if the process is killed since it can not be
shutdown normally if dead lock or dead loop is really happens when consuming,
the later messages 101 to 2100 which is consumed sucessfully will be repeated
to consume again since the consumer offset will still remains at 100.
So to reduce the influence of repeated message numbers, flow control should be
taken.
But the current implementaion is to compare the span of last message of the
first message to consumeConcurrentlyMaxSpan.
In the above example, the span is 2000 and flow control may do action to make
it pause for one cycle for 50ms, but next time when the message 2100 and the
rest of healthy message consumed successfully , the fisrt key and the last key
will be the same, 110, and the max span will be considered as 0, pull operation
will continue.
{code}
public long getMaxSpan() {
try {
this.lockTreeMap.readLock().lockInterruptibly();
try {
if (!this.msgTreeMap.isEmpty()) {
return this.msgTreeMap.lastKey() -
this.msgTreeMap.firstKey();
}
} finally {
this.lockTreeMap.readLock().unlock();
}
} catch (InterruptedException e) {
log.error("getMaxSpan exception", e);
}
return 0;
}
{code}
So for single message problem, consumeConcurrentlyMaxSpan will help nothing but
in my opion this is what flow control should also takes into considerations.
I suggest maxSpan should be lastConsumedOffset(does not record now) -
firstConsumingOffset(the first key of msgTreeMap).
was:
Actually, in the current implementation of consumeConcurrentlyMaxSpan , the
role it plays is very limited.
In my opinion, RocketMQ hopes to solve a problem that when some messages are
blocked( may be possibly considered as dead lock or dead loop),say message with
offset 100 is blocked, and the rest whose offset are 101 to 2100 are very
healthy, in the case that if the process is killed since it can not be shutdown
normally if dead lock or dead loop is really happens when consuming, the later
messages 101 to 2100 which is consumed sucessfully will be repeated to consume
again since the consumer offset will still remains at 100.
So to reduce the influence of repeated message numbers, flow control should be
taken.
But the current implementaion is to compare the span of last message of the
first message to consumeConcurrentlyMaxSpan.
In the above example, the span is 2000 and flow control may do action to make
it pause for one cycle for 50ms, but next time when the message 2100 and the
rest of healthy message consumed successfully , the fisrt key and the last key
will be the same, 110, and the max span will be considered as 0, pull operation
will continue.
{code}
public long getMaxSpan() {
try {
this.lockTreeMap.readLock().lockInterruptibly();
try {
if (!this.msgTreeMap.isEmpty()) {
return this.msgTreeMap.lastKey() -
this.msgTreeMap.firstKey();
}
} finally {
this.lockTreeMap.readLock().unlock();
}
} catch (InterruptedException e) {
log.error("getMaxSpan exception", e);
}
return 0;
}
{code}
So for single message problem, consumeConcurrentlyMaxSpan will help nothing but
in my opion this is what flow control should also takes into considerations.
I suggest maxSpan should be lastConsumedOffset(does not record now) -
firstConsumingOffset(the first key of msgTreeMap).
> consumeConcurrentlyMaxSpan has a very limited role
> --------------------------------------------------
>
> Key: ROCKETMQ-110
> URL: https://issues.apache.org/jira/browse/ROCKETMQ-110
> Project: Apache RocketMQ
> Issue Type: Improvement
> Components: rocketmq-client
> Affects Versions: 4.0.0-incubating
> Reporter: Jaskey Lam
> Assignee: Xiaorui Wang
>
> Actually, in the current implementation of consumeConcurrentlyMaxSpan , the
> role it plays is very limited.
> In my opinion, RocketMQ hopes to solve a problem that when some messages are
> blocked( may be possibly considered as dead lock or dead loop),say message
> with offset 100 is blocked, and the rest whose offset are 101 to 2100 are
> very healthy, but in the case that if the process is killed since it can not
> be shutdown normally if dead lock or dead loop is really happens when
> consuming, the later messages 101 to 2100 which is consumed sucessfully will
> be repeated to consume again since the consumer offset will still remains at
> 100.
> So to reduce the influence of repeated message numbers, flow control should
> be taken.
> But the current implementaion is to compare the span of last message of the
> first message to consumeConcurrentlyMaxSpan.
> In the above example, the span is 2000 and flow control may do action to make
> it pause for one cycle for 50ms, but next time when the message 2100 and the
> rest of healthy message consumed successfully , the fisrt key and the last
> key will be the same, 110, and the max span will be considered as 0, pull
> operation will continue.
> {code}
> public long getMaxSpan() {
> try {
> this.lockTreeMap.readLock().lockInterruptibly();
> try {
> if (!this.msgTreeMap.isEmpty()) {
> return this.msgTreeMap.lastKey() -
> this.msgTreeMap.firstKey();
> }
> } finally {
> this.lockTreeMap.readLock().unlock();
> }
> } catch (InterruptedException e) {
> log.error("getMaxSpan exception", e);
> }
> return 0;
> }
> {code}
> So for single message problem, consumeConcurrentlyMaxSpan will help nothing
> but in my opion this is what flow control should also takes into
> considerations.
> I suggest maxSpan should be lastConsumedOffset(does not record now) -
> firstConsumingOffset(the first key of msgTreeMap).
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)