MatrixHB commented on issue #4086:
URL: https://github.com/apache/rocketmq/issues/4086#issuecomment-1085943494


   In the client code of sequential consumption, the following 3 points can 
avoid disorder of sequential messages.
   1、lock the MessageQueue with only one consumer 
   use the following judgement in `ProcessQueue`
   ```
       public boolean isLocked() {
           return locked;
       }
   ```
   You can study further on the client code 
`org.apache.rocketmq.client.impl.MQClientAPIImpl#lockBatchMQ` and the broker 
code 
`org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager#tryLockBatch` 
.  Try to understand how to ensure that sequential consumer for each queue is 
static,and Load-balancing will not cause consumer to be replaced for one queue .
   Therefore, it won't happen what you said "there will still be other 
consumers repeatedly consuming the message"
   
   2、lock treeMapLock when multi Consume thread taking message concurrently
   `org.apache.rocketmq.client.impl.consumer.ProcessQueue#takeMessages`
   ```
               this.treeMapLock.writeLock().lockInterruptibly();
               try {
                   if (!this.msgTreeMap.isEmpty()) {
                       for (int i = 0; i < batchSize; i++) {
                           Map.Entry<Long, MessageExt> entry = 
this.msgTreeMap.pollFirstEntry();
                           if (entry != null) {
                               result.add(entry.getValue());
                           } else {
                               break;
                           }
                       }
                   }
               } finally {
                   this.treeMapLock.writeLock().unlock();
               }
   ```
   
   3、lock consumeLock when multi Consume thread consume message concurrently
   ```
                               try {
                                   this.processQueue.getConsumeLock().lock();
                                   status = 
messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                               } finally {
                                   this.processQueue.getConsumeLock().unlock();
                               }
   ```
   Therefore, the function of the lock you mentioned in this issue is for the 
concurrent consumption of multiple threads in the same consumer, and it can be 
unlocked after consumption.
   
   If you have any questions, we can discuss more.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to