MatrixHB commented on issue #4086:
URL: https://github.com/apache/rocketmq/issues/4086#issuecomment-1086058078
You said "there will still be other consumers repeatedly consuming the
message", do you mean other consumer process, or other consume thread in one
consumer ?
If you mean "other consumer process", nothing to worry about.
You can study further on the client code
`org.apache.rocketmq.client.impl.MQClientAPIImpl#lockBatchMQ` and the broker
code
`org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager#tryLockBatch`
. It can be ensured that sequential consumer for each queue is static,and
Load-balancing will not cause consumer to be replaced for one queue .
If you mean "other consume thread in one consumer", please pay attention to
the synchronized lock in `ConsumeRequest`.
```
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
....
status =
messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
}
```
This ensures that only one ConsumeRequest thread can consume messages in
each queue at the same time. When the message is consumed, it will be removed
from ProcessQueue, seeing
`org.apache.rocketmq.client.impl.consumer.ProcessQueue#takeMessages`.
Therefore, it won't happen that another thread repeatedly consume the message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]