d4ksn opened a new issue #2621:
URL: https://github.com/apache/rocketmq/issues/2621


   This is the RocketmqClient log my program printed out:
   ```
   [2021-01-23 17:40:48.148]    level:INFO              thread:RebalanceService 
class:RocketmqClient            msg:the message queue lock OK, 
my_consumer_group MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7] 
 ex:
   [2021-01-23 17:40:48.148]    level:INFO              thread:RebalanceService 
class:RocketmqClient            msg:remove unnecessary messageQueue offset. 
group=my_consumer_group, mq=MessageQueue [topic=my_topic, brokerName=broker-a, 
queueId=7], offsetTableSize=0        ex:
   [2021-01-23 17:40:48.161]    level:INFO              thread:RebalanceService 
class:RocketmqClient            msg:doRebalance, my_consumer_group, add a new 
mq, MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7] ex:
   [2021-01-23 17:40:48.172]    level:INFO              thread:RebalanceService 
class:RocketmqClient            msg:the message queue lock OK, 
my_consumer_group MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6] 
 ex:
   [2021-01-23 17:40:48.172]    level:INFO              thread:RebalanceService 
class:RocketmqClient            msg:remove unnecessary messageQueue offset. 
group=my_consumer_group, mq=MessageQueue [topic=my_topic, brokerName=broker-a, 
queueId=6], offsetTableSize=1        ex:
   [2021-01-23 17:40:48.185]    level:INFO              thread:RebalanceService 
class:RocketmqClient            msg:doRebalance, my_consumer_group, add a new 
mq, MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6] ex:
   [2021-01-23 17:40:48.185]    level:INFO              thread:RebalanceService 
class:RocketmqClient            msg:doRebalance, my_consumer_group, add a new 
pull request PullRequest [consumerGroup=my_consumer_group, 
messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=7], 
nextOffset=2542013]        ex:
   [2021-01-23 17:40:48.185]    level:INFO              thread:RebalanceService 
class:RocketmqClient            msg:doRebalance, my_consumer_group, add a new 
pull request PullRequest [consumerGroup=my_consumer_group, 
messageQueue=MessageQueue [topic=my_topic, brokerName=broker-a, queueId=6], 
nextOffset=5950778]        ex:
   [2021-01-23 17:40:48.185]    level:INFO              thread:RebalanceService 
class:RocketmqClient            msg:rebalanced result changed. 
allocateMessageQueueStrategyName=AVG, group=my_consumer_group, topic=my_topic, 
clientId=192.168.26.229@DefaultRocketMQListenerContainer_4, mqAllSize=8, 
cidAllSize=3, rebalanceResultSize=2, rebalanceResultSet=[MessageQueue 
[topic=my_topic, brokerName=broker-a, queueId=7], MessageQueue [topic=my_topic, 
brokerName=broker-a, queueId=6]]   ex:
   [2021-01-23 17:40:48.185]    level:INFO              thread:RebalanceService 
class:RocketmqClient            msg:my_topic Rebalance changed, also update 
version: 1611394846526, 1611394848185       ex:
   [2021-01-23 17:40:48.185]    level:INFO              
thread:PullMessageService       class:RocketmqClient            msg:pull 
message later because not locked in broker, PullRequest 
[consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, 
brokerName=broker-a, queueId=7], nextOffset=2542013]      ex:
   [2021-01-23 17:40:48.185]    level:INFO              
thread:PullMessageService       class:RocketmqClient            msg:pull 
message later because not locked in broker, PullRequest 
[consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, 
brokerName=broker-a, queueId=6], nextOffset=5950778]      ex:
   [2021-01-23 17:40:51.186]    level:INFO              
thread:PullMessageService       class:RocketmqClient            msg:pull 
message later because not locked in broker, PullRequest 
[consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, 
brokerName=broker-a, queueId=7], nextOffset=2542013]      ex:
   [2021-01-23 17:40:51.187]    level:INFO              
thread:PullMessageService       class:RocketmqClient            msg:pull 
message later because not locked in broker, PullRequest 
[consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, 
brokerName=broker-a, queueId=6], nextOffset=5950778]      ex:
   [2021-01-23 17:40:54.187]    level:INFO              
thread:PullMessageService       class:RocketmqClient            msg:pull 
message later because not locked in broker, PullRequest 
[consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, 
brokerName=broker-a, queueId=7], nextOffset=2542013]      ex:
   [2021-01-23 17:40:54.188]    level:INFO              
thread:PullMessageService       class:RocketmqClient            msg:pull 
message later because not locked in broker, PullRequest 
[consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, 
brokerName=broker-a, queueId=6], nextOffset=5950778]      ex:
   [2021-01-23 17:40:57.188]    level:INFO              
thread:PullMessageService       class:RocketmqClient            msg:pull 
message later because not locked in broker, PullRequest 
[consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, 
brokerName=broker-a, queueId=7], nextOffset=2542013]      ex:
   [2021-01-23 17:40:57.188]    level:INFO              
thread:PullMessageService       class:RocketmqClient            msg:pull 
message later because not locked in broker, PullRequest 
[consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, 
brokerName=broker-a, queueId=6], nextOffset=5950778]      ex:
   [2021-01-23 17:41:00.189]    level:INFO              
thread:PullMessageService       class:RocketmqClient            msg:pull 
message later because not locked in broker, PullRequest 
[consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, 
brokerName=broker-a, queueId=7], nextOffset=2542013]      ex:
   [2021-01-23 17:41:00.189]    level:INFO              
thread:PullMessageService       class:RocketmqClient            msg:pull 
message later because not locked in broker, PullRequest 
[consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, 
brokerName=broker-a, queueId=6], nextOffset=5950778]      ex:
   [2021-01-23 17:41:03.190]    level:INFO              
thread:PullMessageService       class:RocketmqClient            msg:pull 
message later because not locked in broker, PullRequest 
[consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, 
brokerName=broker-a, queueId=7], nextOffset=2542013]      ex:
   [2021-01-23 17:41:03.190]    level:INFO              
thread:PullMessageService       class:RocketmqClient            msg:pull 
message later because not locked in broker, PullRequest 
[consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, 
brokerName=broker-a, queueId=6], nextOffset=5950778]      ex:
   [2021-01-23 17:41:06.191]    level:INFO              
thread:PullMessageService       class:RocketmqClient            msg:pull 
message later because not locked in broker, PullRequest 
[consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, 
brokerName=broker-a, queueId=7], nextOffset=2542013]      ex:
   [2021-01-23 17:41:06.192]    level:INFO              
thread:PullMessageService       class:RocketmqClient            msg:pull 
message later because not locked in broker, PullRequest 
[consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, 
brokerName=broker-a, queueId=6], nextOffset=5950778]      ex:
   [2021-01-23 17:41:07.997]    level:INFO              
thread:ConsumeMessageScheduledThread_1  class:RocketmqClient            msg:the 
message queue locked OK, Group: my_consumer_group MessageQueue [topic=my_topic, 
brokerName=broker-a, queueId=7] ex:
   [2021-01-23 17:41:07.997]    level:INFO              
thread:ConsumeMessageScheduledThread_1  class:RocketmqClient            msg:the 
message queue locked OK, Group: my_consumer_group MessageQueue [topic=my_topic, 
brokerName=broker-a, queueId=6] ex:
   [2021-01-23 17:41:09.222]    level:INFO              
thread:PullMessageService       class:RocketmqClient            msg:the first 
time to pull message, so fix offset from broker. pullRequest: PullRequest 
[consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, 
brokerName=broker-a, queueId=7], nextOffset=2542013] NewOffset: 2542013 
brokerBusy: false  ex:
   [2021-01-23 17:41:09.235]    level:INFO              
thread:PullMessageService       class:RocketmqClient            msg:the first 
time to pull message, so fix offset from broker. pullRequest: PullRequest 
[consumerGroup=my_consumer_group, messageQueue=MessageQueue [topic=my_topic, 
brokerName=broker-a, queueId=6], nextOffset=5950778] NewOffset: 5950778 
brokerBusy: false  ex:
   ```
   
   As seen from above, at 17:40:48, the two queues already "lock OK" first, and 
"add a new pull request", but then it said "pull message later because not 
locked in broker", so it went into a 3-second retry loop (but still failed like 
before).
   Almost 20 seconds later at 17:41:07, it finally said "the message queue 
locked OK", and started consuming messages normally.
   So, in the 17:40:48-17:41:07 time range, the messages in these 2 queues were 
kept in the broker, not delivered to the consumer, and caused a 20-second 
consumer delay.
   
   If you look into the code and debug, you will find that:
   In the first `RebalanceImpl#lock` invocation, the `processQueueTable` 
doesn't contain the entry for the new queue yet, so 
`processQueue.setLocked(true);` is not invoked.
   `processQueueTable.putIfAbsent(mq, pq);` is called only after 
`RebalanceImpl#lock` has returned.
   However, besides `RebalanceImpl#lock`, we also invoke 
`RebalanceImpl#lockAll` periodically, at a 20-second interval by default. So, 
`processQueue.setLocked(true);` finally get invoked after 20 seconds. But 
actually, the queue is already locked in the broker 20 seconds before.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to