This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new f20ce8a56 [ISSUE #5766] Fix possible message NPE (#5769)
f20ce8a56 is described below
commit f20ce8a563f8e9c9e9de70a57a9e2bc441a984f4
Author: xiaoyifang <[email protected]>
AuthorDate: Wed Jan 4 21:50:59 2023 +0800
[ISSUE #5766] Fix possible message NPE (#5769)
---
.../org/apache/rocketmq/client/impl/consumer/ProcessQueue.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index b6a4356a2..74238e024 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -79,7 +79,7 @@ public class ProcessQueue {
return;
}
- int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
+ int loop = Math.min(msgTreeMap.size(), 16);
for (int i = 0; i < loop; i++) {
MessageExt msg = null;
try {
@@ -89,11 +89,7 @@ public class ProcessQueue {
String consumeStartTimeStamp =
MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue());
if (StringUtils.isNotEmpty(consumeStartTimeStamp) &&
System.currentTimeMillis() - Long.parseLong(consumeStartTimeStamp) >
pushConsumer.getConsumeTimeout() * 60 * 1000) {
msg = msgTreeMap.firstEntry().getValue();
- } else {
- break;
}
- } else {
- break;
}
} finally {
this.treeMapLock.readLock().unlock();
@@ -102,6 +98,10 @@ public class ProcessQueue {
log.error("getExpiredMsg exception", e);
}
+ if (msg == null) {
+ break;
+ }
+
try {
pushConsumer.sendMessageBack(msg, 3);