This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new faae64715d [ISSUE #7601] Fix slave acting master bug (#7603)
faae64715d is described below
commit faae64715d917bb5d64b8d72581172d26ebe9501
Author: gaoyf <[email protected]>
AuthorDate: Thu Dec 7 11:25:22 2023 +0800
[ISSUE #7601] Fix slave acting master bug (#7603)
* fix NullPointerException when message escape to remote
* fix NumberFormatException when message retry to escape to remote
* fix timerCheckPoint of the master is not updated, causing the timer
message to be replayed after master is restarted
* Use properties copies instead of referencing the same map when converting
message
---
.../java/org/apache/rocketmq/broker/BrokerController.java | 1 +
.../org/apache/rocketmq/broker/slave/SlaveSynchronize.java | 4 +++-
.../org/apache/rocketmq/common/message/MessageAccessor.java | 7 +++++++
.../org/apache/rocketmq/store/timer/TimerMessageStore.java | 12 +++++++++---
4 files changed, 20 insertions(+), 4 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 9f1fd0ad02..8d29d44383 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -2108,6 +2108,7 @@ public class BrokerController {
isScheduleServiceStart = shouldStart;
if (timerMessageStore != null) {
+ timerMessageStore.syncLastReadTimeMs();
timerMessageStore.setShouldRunningDequeue(shouldStart);
}
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
index 53cdecdf85..7f802adb93 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
@@ -215,11 +215,13 @@ public class SlaveSynchronize {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
try {
- if (null !=
brokerController.getMessageStore().getTimerMessageStore()) {
+ if (null !=
brokerController.getMessageStore().getTimerMessageStore() &&
+
!brokerController.getTimerMessageStore().isShouldRunningDequeue()) {
TimerCheckpoint checkpoint =
this.brokerController.getBrokerOuterAPI().getTimerCheckPoint(masterAddrBak);
if (null != this.brokerController.getTimerCheckpoint()) {
this.brokerController.getTimerCheckpoint().setLastReadTimeMs(checkpoint.getLastReadTimeMs());
this.brokerController.getTimerCheckpoint().setMasterTimerQueueOffset(checkpoint.getMasterTimerQueueOffset());
+
this.brokerController.getTimerCheckpoint().getDataVersion().assignNewOne(checkpoint.getDataVersion());
}
}
} catch (Exception e) {
diff --git
a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
index 1b7e2bba32..62e3bbd7e6 100644
---
a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
+++
b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.common.message;
+import java.util.HashMap;
import java.util.Map;
public class MessageAccessor {
@@ -96,4 +97,10 @@ public class MessageAccessor {
return newMsg;
}
+ public static Map<String, String> deepCopyProperties(Map<String, String>
properties) {
+ if (properties == null) {
+ return null;
+ }
+ return new HashMap<>(properties);
+ }
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index d796e4467d..872cd71054 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -602,6 +602,10 @@ public class TimerMessageStore {
this.shouldRunningDequeue = shouldRunningDequeue;
}
+ public boolean isShouldRunningDequeue() {
+ return shouldRunningDequeue;
+ }
+
public void addMetric(MessageExt msg, int value) {
try {
if (null == msg || null ==
msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC)) {
@@ -1084,8 +1088,10 @@ public class TimerMessageStore {
case PUT_OK:
if (brokerStatsManager != null) {
this.brokerStatsManager.incTopicPutNums(message.getTopic(), 1, 1);
-
this.brokerStatsManager.incTopicPutSize(message.getTopic(),
-
putMessageResult.getAppendMessageResult().getWroteBytes());
+ if (putMessageResult.getAppendMessageResult() !=
null) {
+
this.brokerStatsManager.incTopicPutSize(message.getTopic(),
+
putMessageResult.getAppendMessageResult().getWroteBytes());
+ }
this.brokerStatsManager.incBrokerPutNums(message.getTopic(), 1);
}
return PUT_OK;
@@ -1119,7 +1125,7 @@ public class TimerMessageStore {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
- MessageAccessor.setProperties(msgInner, msgExt.getProperties());
+ MessageAccessor.setProperties(msgInner,
MessageAccessor.deepCopyProperties(msgExt.getProperties()));
TopicFilterType topicFilterType =
MessageExt.parseTopicFilterType(msgInner.getSysFlag());
long tagsCodeValue =
MessageExtBrokerInner.tagsString2tagsCode(topicFilterType,
msgInner.getTags());