This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e805671c89 [ISSUE #9206] Fix slave sync topic sub in rocksdb ha (#9207)
e805671c89 is described below
commit e805671c89eb1e647f1bd429b7538761bf69ad27
Author: fujian-zfj <[email protected]>
AuthorDate: Wed Feb 26 11:27:40 2025 +0800
[ISSUE #9206] Fix slave sync topic sub in rocksdb ha (#9207)
* typo int readme[ecosystem]
* fix slave sunc topic and sub in rocksdb ha mode
---
.../config/v1/RocksDBSubscriptionGroupManager.java | 1 -
.../rocketmq/broker/slave/SlaveSynchronize.java | 44 +++++++++++++++-------
.../subscription/SubscriptionGroupManager.java | 2 +-
.../rocketmq/broker/topic/TopicConfigManager.java | 2 +-
4 files changed, 32 insertions(+), 17 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
index ff47152569..b208169e41 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
@@ -78,7 +78,6 @@ public class RocksDBSubscriptionGroupManager extends
SubscriptionGroupManager {
return true;
}
-
private boolean merge() {
if (!UtilAll.isPathExists(this.configFilePath()) &&
!UtilAll.isPathExists(this.configFilePath() + ".bak")) {
log.info("subGroup json file does not exist, so skip merge");
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
index bfb5c9dcd0..f75fd21610 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
@@ -17,6 +17,8 @@
package org.apache.rocketmq.broker.slave;
import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -24,6 +26,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.loadbalance.MessageRequestModeManager;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
+import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -77,20 +80,28 @@ public class SlaveSynchronize {
try {
TopicConfigAndMappingSerializeWrapper topicWrapper =
this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
- if
(!this.brokerController.getTopicConfigManager().getDataVersion()
- .equals(topicWrapper.getDataVersion())) {
+ TopicConfigManager topicConfigManager =
this.brokerController.getTopicConfigManager();
+ if
(!topicConfigManager.getDataVersion().equals(topicWrapper.getDataVersion())) {
-
this.brokerController.getTopicConfigManager().getDataVersion()
- .assignNewOne(topicWrapper.getDataVersion());
+
topicConfigManager.getDataVersion().assignNewOne(topicWrapper.getDataVersion());
ConcurrentMap<String, TopicConfig> newTopicConfigTable =
topicWrapper.getTopicConfigTable();
+ ConcurrentMap<String, TopicConfig> topicConfigTable =
topicConfigManager.getTopicConfigTable();
+
//delete
- ConcurrentMap<String, TopicConfig> topicConfigTable =
this.brokerController.getTopicConfigManager().getTopicConfigTable();
- topicConfigTable.entrySet().removeIf(item ->
!newTopicConfigTable.containsKey(item.getKey()));
+ Iterator<Map.Entry<String, TopicConfig>> iterator =
topicConfigTable.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<String, TopicConfig> entry = iterator.next();
+ if (!newTopicConfigTable.containsKey(entry.getKey())) {
+ iterator.remove();
+ }
+ topicConfigManager.deleteTopicConfig(entry.getKey());
+ }
+
//update
- topicConfigTable.putAll(newTopicConfigTable);
+
newTopicConfigTable.values().forEach(topicConfigManager::updateSingleTopicConfigWithoutPersist);
- this.brokerController.getTopicConfigManager().persist();
+ topicConfigManager.persist();
}
if (topicWrapper.getTopicQueueMappingDetailMap() != null
&&
!topicWrapper.getMappingDataVersion().equals(this.brokerController.getTopicQueueMappingManager().getDataVersion()))
{
@@ -165,19 +176,24 @@ public class SlaveSynchronize {
if
(!this.brokerController.getSubscriptionGroupManager().getDataVersion()
.equals(subscriptionWrapper.getDataVersion())) {
- SubscriptionGroupManager subscriptionGroupManager =
-
this.brokerController.getSubscriptionGroupManager();
- subscriptionGroupManager.getDataVersion().assignNewOne(
- subscriptionWrapper.getDataVersion());
+ SubscriptionGroupManager subscriptionGroupManager =
this.brokerController.getSubscriptionGroupManager();
+
subscriptionGroupManager.getDataVersion().assignNewOne(subscriptionWrapper.getDataVersion());
ConcurrentMap<String, SubscriptionGroupConfig>
curSubscriptionGroupTable =
subscriptionGroupManager.getSubscriptionGroupTable();
ConcurrentMap<String, SubscriptionGroupConfig>
newSubscriptionGroupTable =
subscriptionWrapper.getSubscriptionGroupTable();
// delete
- curSubscriptionGroupTable.entrySet().removeIf(e ->
!newSubscriptionGroupTable.containsKey(e.getKey()));
+ Iterator<Map.Entry<String, SubscriptionGroupConfig>>
iterator = curSubscriptionGroupTable.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<String, SubscriptionGroupConfig> configEntry
= iterator.next();
+ if
(!newSubscriptionGroupTable.containsKey(configEntry.getKey())) {
+ iterator.remove();
+ }
+
subscriptionGroupManager.deleteSubscriptionGroupConfig(configEntry.getKey());
+ }
// update
-
curSubscriptionGroupTable.putAll(newSubscriptionGroupTable);
+
newSubscriptionGroupTable.values().forEach(subscriptionGroupManager::updateSubscriptionGroupConfigWithoutPersist);
// persist
subscriptionGroupManager.persist();
LOGGER.info("Update slave Subscription Group from master,
{}", masterAddrBak);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index f62a3e4a09..d85342e1a1 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -143,7 +143,7 @@ public class SubscriptionGroupManager extends ConfigManager
{
this.persist();
}
- protected void
updateSubscriptionGroupConfigWithoutPersist(SubscriptionGroupConfig config) {
+ public void
updateSubscriptionGroupConfigWithoutPersist(SubscriptionGroupConfig config) {
Map<String, String> newAttributes = request(config);
Map<String, String> currentAttributes = current(config.getGroupName());
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 4530c10002..b20cafc101 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -497,7 +497,7 @@ public class TopicConfigManager extends ConfigManager {
}
}
- protected void updateSingleTopicConfigWithoutPersist(final TopicConfig
topicConfig) {
+ public void updateSingleTopicConfigWithoutPersist(final TopicConfig
topicConfig) {
checkNotNull(topicConfig, "topicConfig shouldn't be null");
Map<String, String> newAttributes = request(topicConfig);