This is an automated email from the ASF dual-hosted git repository.

yuzhou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e425c09  [ISSUE #1097] Fix null pointer problem when consumption start 
time is null (#1098)
e425c09 is described below

commit e425c097ad6ab2e31786fc343805356363ddcf79
Author: ssssssnake <[email protected]>
AuthorDate: Mon Dec 20 20:07:45 2021 +0800

    [ISSUE #1097] Fix null pointer problem when consumption start time is null 
(#1098)
---
 .../apache/rocketmq/client/impl/consumer/ProcessQueue.java   | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index 21798d8..ba00aae 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -27,6 +27,8 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.commons.lang3.StringUtils;
+
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.logging.InternalLogger;
@@ -85,10 +87,14 @@ public class ProcessQueue {
             try {
                 this.treeMapLock.readLock().lockInterruptibly();
                 try {
-                    if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - 
Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue()))
 > pushConsumer.getConsumeTimeout() * 60 * 1000) {
-                        msg = msgTreeMap.firstEntry().getValue();
+                    if (!msgTreeMap.isEmpty()) {
+                        String consumeStartTimeStamp = 
MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue());
+                        if (StringUtils.isNotEmpty(consumeStartTimeStamp) && 
System.currentTimeMillis() - Long.parseLong(consumeStartTimeStamp) > 
pushConsumer.getConsumeTimeout() * 60 * 1000) {
+                            msg = msgTreeMap.firstEntry().getValue();
+                        } else {
+                            break;
+                        }
                     } else {
-
                         break;
                     }
                 } finally {

Reply via email to