This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 9b97eaf5b [ISSUE #6424] Make topicConfig updating atomically
9b97eaf5b is described below
commit 9b97eaf5ba8468dcd4b0d695f1de1e8f29170618
Author: guyinyou <[email protected]>
AuthorDate: Thu Mar 23 14:18:19 2023 +0800
[ISSUE #6424] Make topicConfig updating atomically
Co-authored-by: guyinyou <[email protected]>
---
.../rocketmq/broker/slave/SlaveSynchronize.java | 71 +++++++++++++++-------
1 file changed, 48 insertions(+), 23 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
index 7d22ffb45..8cbdc2555 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
@@ -17,11 +17,16 @@
package org.apache.rocketmq.broker.slave;
import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.loadbalance.MessageRequestModeManager;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -70,24 +75,44 @@ public class SlaveSynchronize {
if (masterAddrBak != null &&
!masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
TopicConfigAndMappingSerializeWrapper topicWrapper =
-
this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
+
this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
if
(!this.brokerController.getTopicConfigManager().getDataVersion()
- .equals(topicWrapper.getDataVersion())) {
+ .equals(topicWrapper.getDataVersion())) {
this.brokerController.getTopicConfigManager().getDataVersion()
- .assignNewOne(topicWrapper.getDataVersion());
-
this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();
-
this.brokerController.getTopicConfigManager().getTopicConfigTable()
- .putAll(topicWrapper.getTopicConfigTable());
+ .assignNewOne(topicWrapper.getDataVersion());
+
+ ConcurrentMap<String, TopicConfig> newTopicConfigTable =
topicWrapper.getTopicConfigTable();
+ //delete
+ ConcurrentMap<String, TopicConfig> topicConfigTable =
this.brokerController.getTopicConfigManager().getTopicConfigTable();
+ for (Iterator<Map.Entry<String, TopicConfig>> it =
topicConfigTable.entrySet().iterator(); it.hasNext(); ) {
+ Map.Entry<String, TopicConfig> item = it.next();
+ if (!newTopicConfigTable.containsKey(item.getKey())) {
+ it.remove();
+ }
+ }
+ //update
+ topicConfigTable.putAll(newTopicConfigTable);
+
this.brokerController.getTopicConfigManager().persist();
}
if (topicWrapper.getTopicQueueMappingDetailMap() != null
&&
!topicWrapper.getMappingDataVersion().equals(this.brokerController.getTopicQueueMappingManager().getDataVersion()))
{
this.brokerController.getTopicQueueMappingManager().getDataVersion()
.assignNewOne(topicWrapper.getMappingDataVersion());
-
this.brokerController.getTopicQueueMappingManager().getTopicQueueMappingTable().clear();
-
this.brokerController.getTopicQueueMappingManager().getTopicQueueMappingTable()
-
.putAll(topicWrapper.getTopicQueueMappingDetailMap());
+
+ ConcurrentMap<String, TopicConfig> newTopicConfigTable =
topicWrapper.getTopicConfigTable();
+ //delete
+ ConcurrentMap<String, TopicConfig> topicConfigTable =
this.brokerController.getTopicConfigManager().getTopicConfigTable();
+ for (Iterator<Map.Entry<String, TopicConfig>> it =
topicConfigTable.entrySet().iterator(); it.hasNext(); ) {
+ Map.Entry<String, TopicConfig> item = it.next();
+ if (!newTopicConfigTable.containsKey(item.getKey())) {
+ it.remove();
+ }
+ }
+ //update
+ topicConfigTable.putAll(newTopicConfigTable);
+
this.brokerController.getTopicQueueMappingManager().persist();
}
LOGGER.info("Update slave topic config from master, {}",
masterAddrBak);
@@ -102,9 +127,9 @@ public class SlaveSynchronize {
if (masterAddrBak != null &&
!masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
ConsumerOffsetSerializeWrapper offsetWrapper =
-
this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
+
this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
this.brokerController.getConsumerOffsetManager().getOffsetTable()
- .putAll(offsetWrapper.getOffsetTable());
+ .putAll(offsetWrapper.getOffsetTable());
this.brokerController.getConsumerOffsetManager().getDataVersion().assignNewOne(offsetWrapper.getDataVersion());
this.brokerController.getConsumerOffsetManager().persist();
LOGGER.info("Update slave consumer offset from master, {}",
masterAddrBak);
@@ -119,12 +144,12 @@ public class SlaveSynchronize {
if (masterAddrBak != null &&
!masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
String delayOffset =
-
this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
+
this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
if (delayOffset != null) {
String fileName =
-
StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController
- .getMessageStoreConfig().getStorePathRootDir());
+
StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController
+
.getMessageStoreConfig().getStorePathRootDir());
try {
MixAll.string2File(delayOffset, fileName);
this.brokerController.getScheduleMessageService().load();
@@ -141,21 +166,21 @@ public class SlaveSynchronize {
private void syncSubscriptionGroupConfig() {
String masterAddrBak = this.masterAddr;
- if (masterAddrBak != null &&
!masterAddrBak.equals(brokerController.getBrokerAddr())) {
+ if (masterAddrBak != null &&
!masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
SubscriptionGroupWrapper subscriptionWrapper =
- this.brokerController.getBrokerOuterAPI()
- .getAllSubscriptionGroupConfig(masterAddrBak);
+ this.brokerController.getBrokerOuterAPI()
+ .getAllSubscriptionGroupConfig(masterAddrBak);
if
(!this.brokerController.getSubscriptionGroupManager().getDataVersion()
- .equals(subscriptionWrapper.getDataVersion())) {
+ .equals(subscriptionWrapper.getDataVersion())) {
SubscriptionGroupManager subscriptionGroupManager =
- this.brokerController.getSubscriptionGroupManager();
+
this.brokerController.getSubscriptionGroupManager();
subscriptionGroupManager.getDataVersion().assignNewOne(
- subscriptionWrapper.getDataVersion());
+ subscriptionWrapper.getDataVersion());
subscriptionGroupManager.getSubscriptionGroupTable().clear();
subscriptionGroupManager.getSubscriptionGroupTable().putAll(
- subscriptionWrapper.getSubscriptionGroupTable());
+ subscriptionWrapper.getSubscriptionGroupTable());
subscriptionGroupManager.persist();
LOGGER.info("Update slave Subscription Group from master,
{}", masterAddrBak);
}
@@ -167,7 +192,7 @@ public class SlaveSynchronize {
private void syncMessageRequestMode() {
String masterAddrBak = this.masterAddr;
- if (masterAddrBak != null &&
!masterAddrBak.equals(brokerController.getBrokerAddr())) {
+ if (masterAddrBak != null &&
!masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
MessageRequestModeSerializeWrapper
messageRequestModeSerializeWrapper =
this.brokerController.getBrokerOuterAPI().getAllMessageRequestMode(masterAddrBak);
@@ -207,7 +232,7 @@ public class SlaveSynchronize {
try {
if (null !=
brokerController.getMessageStore().getTimerMessageStore()) {
TimerMetrics.TimerMetricsSerializeWrapper
metricsSerializeWrapper =
-
this.brokerController.getBrokerOuterAPI().getTimerMetrics(masterAddrBak);
+
this.brokerController.getBrokerOuterAPI().getTimerMetrics(masterAddrBak);
if
(!brokerController.getMessageStore().getTimerMessageStore().getTimerMetrics().getDataVersion().equals(metricsSerializeWrapper.getDataVersion()))
{
this.brokerController.getMessageStore().getTimerMessageStore().getTimerMetrics().getDataVersion().assignNewOne(metricsSerializeWrapper.getDataVersion());
this.brokerController.getMessageStore().getTimerMessageStore().getTimerMetrics().getTimingCount().clear();