MatrixHB commented on issue #4086:
URL: https://github.com/apache/rocketmq/issues/4086#issuecomment-1086058078


   You said "there will still be other consumers repeatedly consuming the 
message",  do you mean other consumer process, or other consume thread in one 
consumer ?
   
   If you mean "other consumer process", nothing to worry about. 
   You can study further on the client code 
`org.apache.rocketmq.client.impl.MQClientAPIImpl#lockBatchMQ` and the broker 
code 
`org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager#tryLockBatch` 
. It can be ensured that sequential consumer for each queue is static,and 
Load-balancing will not cause consumer to be replaced for one queue .
   
   If you mean "other consume thread in one consumer", please pay attention to 
the synchronized lock in `ConsumeRequest`.
   ```
   final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
   synchronized (objLock) {
          ....
          status = 
messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
   }
   ```
   This ensures that only one ConsumeRequest thread can consume messages in 
each queue at the same time. When the message is consumed, it will be removed 
from ProcessQueue, seeing 
`org.apache.rocketmq.client.impl.consumer.ProcessQueue#takeMessages`. 
Therefore, it won't happen that another thread repeatedly consume the message. 
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to