leiy88 opened a new issue #3937:
URL: https://github.com/apache/rocketmq/issues/3937


   **BUG REPORT**
   
   1. Please describe the issue you observed:
   
   - What did you do (The steps to reproduce)?
   Hard to reproduce.
   
   2. Please tell us about your environment:
   - JDK1.8 run on Centos7
   - rocketmq-client version 4.8.0
   - debug with Arthas
   
   4. Other information (e.g. detailed explanation, logs, related issues, 
suggestions on how to fix, etc):
   - Issue
   Some queues consumerOffset will be blocked. Like queue 12 and queue 14
   
![mqp](https://user-images.githubusercontent.com/19524201/157005726-4ab5590a-c892-4574-bd7b-6de9f9a305d7.png)
   - Analyze
   
   PullRequest is keep going, I can find client log like this:
   
   ```
   WARN RocketmqClient - the queue's messages, span
   too long, so do flow control, minOffset=668629, maxOffset=669630,
   maxSpan=1001
   ```
   Debug with Arthas, also keep going and nextOffset is newest
   ```
   ognl
   
'@SpringUtils@getBean("xxxSubscriber").consumer.defaultMQPushConsumerImpl.mQClientFacto
   ry.pullMessageService.pullRequestQueue.take()' -c 5674cd4d
   @PullRequest[
     consumerGroup=@String[xxx],
     messageQueue=@MessageQueue[MessageQueue
   [topic=xxx, brokerName=broker3,
   queueId=30]],
     processQueue=@ProcessQueue[org.apache.rocketmq.client.impl.consumer.Proce
   ssQueue@2386cd61],
     nextOffset=@Long[669749],
     lockedFirst=@Boolean[false],
   ]
   ```
   and ConsumerThreadPool not busy
   ```
   ognl
   '@SpringUtils@getBean("xxx").consumer.defaultMQPushConsumerImpl.consumeMessag
   eService.consumeExecutor' -c 5674cd4d
   
    
   consumeExecutor=@ThreadPoolExecutor[java.util.concurrent.ThreadPoolExecutor
   @320f074e[Running, pool size = 20, active threads = 0, queued tasks = 0,
   completed tasks = 744439]]
   
   ``` 
   then I took a look at ProcessQueue, some expired msg was blocked in it, and 
never will be clean
   ```
   ognl 
'@SpringUtils@getBean("xxx").consumer.defaultMQPushConsumerImpl.rebalanceImpl.processQueueTable'
 -c 2f2c9b19 -x 3 > ~/pq
   
   Here is what in the treeMap
    @Long[683527]:@MessageClientExt[MessageExt [queueId=17, storeSize=1426, 
queueOffset=683527, sysFlag=0, bornTimestamp=1646575741924, 
bornHost=/192.168.112.9:14371, storeTimestamp=1646575741976, 
storeHost=/192.168.109.40:10912, msgId=C0A86D2800002AA000000738BECAA2F4, 
commitLogOffset=7940300514036, bodyCRC=1812790343, reconsumeTimes=0, 
preparedTransactionOffset=0, toString()=Message{topic=‘xxx', flag=0, 
properties={MIN_OFFSET=679961, MAX_OFFSET=684744, KEYS=998013830434914562, 
UNIQ_KEY=C0A870095FEF5674CD4D1E808FE4F79C, CLUSTER=Cluster2}
   ```
   Why expired msgs still here? Look at the cleanExpireMsgExecutors, no queued 
task. It was stopped!
   ```
   ognl 
'@SpringUtils@getBean("xxx").consumer.defaultMQPushConsumerImpl.consumeMessageService.cleanExpireMsgExecutors'
 -c 5674cd4d
   @DelegatedScheduledExecutorService[
       
e=@ScheduledThreadPoolExecutor[java.util.concurrent.ScheduledThreadPoolExecutor@e419b6a[Running,
 pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 13555]],
       
e=@ScheduledThreadPoolExecutor[java.util.concurrent.ScheduledThreadPoolExecutor@e419b6a[Running,
 pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 13555]],
       $assertionsDisabled=@Boolean[true],
   ]
   ```
   So I tried to trigger cleanExpireMessage by Arthas. It worked, after do this 
consumerOffset will catch up broker offset
   ```
   ognl 
'@SpringUtils@getBean("newSmsCodeSubscriber").consumer.defaultMQPushConsumerImpl.consumeMessageService.cleanExpireMsg()'
 -c 5674cd4d
   ```
   But when i do this many times, I met this:
   ```
   Caused by: java.lang.NumberFormatException: null
           at java.lang.Long.parseLong(Long.java:552)
           at java.lang.Long.parseLong(Long.java:631)
           at 
org.apache.rocketmq.client.impl.consumer.ProcessQueue.cleanExpiredMsg(ProcessQueue.java:87)
           at 
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.cleanExpireMsg(ConsumeMessageConcurrentlyService.java:270)
   ```
   Look back to the msg in treeMap, there is no CONSUME_START_TIME property!
   So ProcessQueue.cleanExpireMessage will throw NumberFormatException and make 
the cleanExpireMsgExecutors stop working!
   <img width="1106" alt="mqe" 
src="https://user-images.githubusercontent.com/19524201/157010049-ebba991f-0d9f-4d74-a0f4-efdde48a8737.png";>
   
   
   But I still don't know why ProcessQueue.removeMessage in 
processConsumeResult not working.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to