This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new b638d4cfbb [ISSUE #8460] Set default broker name when revive found ack
without broker name field (#8981)
b638d4cfbb is described below
commit b638d4cfbbdfcbd6a8aa2feee580519f68d85730
Author: lizhimins <[email protected]>
AuthorDate: Mon Nov 25 19:24:43 2024 +0800
[ISSUE #8460] Set default broker name when revive found ack without broker
name field (#8981)
---
.../org/apache/rocketmq/broker/processor/PopReviveService.java | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index f27934efdf..e1ead86169 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
@@ -376,7 +377,9 @@ public class PopReviveService extends ServiceThread {
}
AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class);
PopMetricsManager.incPopReviveAckGetCount(ackMsg, queueId);
- String mergeKey = ackMsg.getTopic() +
ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() +
ackMsg.getPopTime() + ackMsg.getBrokerName();
+ String brokerName =
StringUtils.isNotBlank(ackMsg.getBrokerName()) ?
+ ackMsg.getBrokerName() :
brokerController.getBrokerConfig().getBrokerName();
+ String mergeKey = ackMsg.getTopic() +
ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() +
ackMsg.getPopTime() + brokerName;
PopCheckPoint point = map.get(mergeKey);
if (point == null) {
if
(!brokerController.getBrokerConfig().isEnableSkipLongAwaitingAck()) {
@@ -401,7 +404,9 @@ public class PopReviveService extends ServiceThread {
BatchAckMsg bAckMsg = JSON.parseObject(raw,
BatchAckMsg.class);
PopMetricsManager.incPopReviveAckGetCount(bAckMsg,
queueId);
- String mergeKey = bAckMsg.getTopic() +
bAckMsg.getConsumerGroup() + bAckMsg.getQueueId() + bAckMsg.getStartOffset() +
bAckMsg.getPopTime() + bAckMsg.getBrokerName();
+ String brokerName =
StringUtils.isNotBlank(bAckMsg.getBrokerName()) ?
+ bAckMsg.getBrokerName() :
brokerController.getBrokerConfig().getBrokerName();
+ String mergeKey = bAckMsg.getTopic() +
bAckMsg.getConsumerGroup() + bAckMsg.getQueueId() + bAckMsg.getStartOffset() +
bAckMsg.getPopTime() + brokerName;
PopCheckPoint point = map.get(mergeKey);
if (point == null) {
if
(!brokerController.getBrokerConfig().isEnableSkipLongAwaitingAck()) {