This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new f20ce8a56 [ISSUE #5766] Fix possible message NPE (#5769)
f20ce8a56 is described below

commit f20ce8a563f8e9c9e9de70a57a9e2bc441a984f4
Author: xiaoyifang <[email protected]>
AuthorDate: Wed Jan 4 21:50:59 2023 +0800

    [ISSUE #5766] Fix possible message NPE (#5769)
---
 .../org/apache/rocketmq/client/impl/consumer/ProcessQueue.java | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index b6a4356a2..74238e024 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -79,7 +79,7 @@ public class ProcessQueue {
             return;
         }
 
-        int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
+        int loop = Math.min(msgTreeMap.size(), 16);
         for (int i = 0; i < loop; i++) {
             MessageExt msg = null;
             try {
@@ -89,11 +89,7 @@ public class ProcessQueue {
                         String consumeStartTimeStamp = 
MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue());
                         if (StringUtils.isNotEmpty(consumeStartTimeStamp) && 
System.currentTimeMillis() - Long.parseLong(consumeStartTimeStamp) > 
pushConsumer.getConsumeTimeout() * 60 * 1000) {
                             msg = msgTreeMap.firstEntry().getValue();
-                        } else {
-                            break;
                         }
-                    } else {
-                        break;
                     }
                 } finally {
                     this.treeMapLock.readLock().unlock();
@@ -102,6 +98,10 @@ public class ProcessQueue {
                 log.error("getExpiredMsg exception", e);
             }
 
+            if (msg == null) {
+                break;
+            }
+
             try {
 
                 pushConsumer.sendMessageBack(msg, 3);

Reply via email to