This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9b97eaf5b  [ISSUE #6424] Make topicConfig updating atomically
9b97eaf5b is described below

commit 9b97eaf5ba8468dcd4b0d695f1de1e8f29170618
Author: guyinyou <[email protected]>
AuthorDate: Thu Mar 23 14:18:19 2023 +0800

     [ISSUE #6424] Make topicConfig updating atomically
    
    Co-authored-by: guyinyou <[email protected]>
---
 .../rocketmq/broker/slave/SlaveSynchronize.java    | 71 +++++++++++++++-------
 1 file changed, 48 insertions(+), 23 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java 
b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
index 7d22ffb45..8cbdc2555 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
@@ -17,11 +17,16 @@
 package org.apache.rocketmq.broker.slave;
 
 import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.loadbalance.MessageRequestModeManager;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -70,24 +75,44 @@ public class SlaveSynchronize {
         if (masterAddrBak != null && 
!masterAddrBak.equals(brokerController.getBrokerAddr())) {
             try {
                 TopicConfigAndMappingSerializeWrapper topicWrapper =
-                    
this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
+                        
this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
                 if 
(!this.brokerController.getTopicConfigManager().getDataVersion()
-                    .equals(topicWrapper.getDataVersion())) {
+                        .equals(topicWrapper.getDataVersion())) {
 
                     
this.brokerController.getTopicConfigManager().getDataVersion()
-                        .assignNewOne(topicWrapper.getDataVersion());
-                    
this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();
-                    
this.brokerController.getTopicConfigManager().getTopicConfigTable()
-                        .putAll(topicWrapper.getTopicConfigTable());
+                            .assignNewOne(topicWrapper.getDataVersion());
+
+                    ConcurrentMap<String, TopicConfig> newTopicConfigTable = 
topicWrapper.getTopicConfigTable();
+                    //delete
+                    ConcurrentMap<String, TopicConfig> topicConfigTable = 
this.brokerController.getTopicConfigManager().getTopicConfigTable();
+                    for (Iterator<Map.Entry<String, TopicConfig>> it = 
topicConfigTable.entrySet().iterator(); it.hasNext(); ) {
+                        Map.Entry<String, TopicConfig> item = it.next();
+                        if (!newTopicConfigTable.containsKey(item.getKey())) {
+                            it.remove();
+                        }
+                    }
+                    //update
+                    topicConfigTable.putAll(newTopicConfigTable);
+
                     this.brokerController.getTopicConfigManager().persist();
                 }
                 if (topicWrapper.getTopicQueueMappingDetailMap() != null
                         && 
!topicWrapper.getMappingDataVersion().equals(this.brokerController.getTopicQueueMappingManager().getDataVersion()))
 {
                     
this.brokerController.getTopicQueueMappingManager().getDataVersion()
                             
.assignNewOne(topicWrapper.getMappingDataVersion());
-                    
this.brokerController.getTopicQueueMappingManager().getTopicQueueMappingTable().clear();
-                    
this.brokerController.getTopicQueueMappingManager().getTopicQueueMappingTable()
-                            
.putAll(topicWrapper.getTopicQueueMappingDetailMap());
+
+                    ConcurrentMap<String, TopicConfig> newTopicConfigTable = 
topicWrapper.getTopicConfigTable();
+                    //delete
+                    ConcurrentMap<String, TopicConfig> topicConfigTable = 
this.brokerController.getTopicConfigManager().getTopicConfigTable();
+                    for (Iterator<Map.Entry<String, TopicConfig>> it = 
topicConfigTable.entrySet().iterator(); it.hasNext(); ) {
+                        Map.Entry<String, TopicConfig> item = it.next();
+                        if (!newTopicConfigTable.containsKey(item.getKey())) {
+                            it.remove();
+                        }
+                    }
+                    //update
+                    topicConfigTable.putAll(newTopicConfigTable);
+
                     
this.brokerController.getTopicQueueMappingManager().persist();
                 }
                 LOGGER.info("Update slave topic config from master, {}", 
masterAddrBak);
@@ -102,9 +127,9 @@ public class SlaveSynchronize {
         if (masterAddrBak != null && 
!masterAddrBak.equals(brokerController.getBrokerAddr())) {
             try {
                 ConsumerOffsetSerializeWrapper offsetWrapper =
-                    
this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
+                        
this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
                 
this.brokerController.getConsumerOffsetManager().getOffsetTable()
-                    .putAll(offsetWrapper.getOffsetTable());
+                        .putAll(offsetWrapper.getOffsetTable());
                 
this.brokerController.getConsumerOffsetManager().getDataVersion().assignNewOne(offsetWrapper.getDataVersion());
                 this.brokerController.getConsumerOffsetManager().persist();
                 LOGGER.info("Update slave consumer offset from master, {}", 
masterAddrBak);
@@ -119,12 +144,12 @@ public class SlaveSynchronize {
         if (masterAddrBak != null && 
!masterAddrBak.equals(brokerController.getBrokerAddr())) {
             try {
                 String delayOffset =
-                    
this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
+                        
this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
                 if (delayOffset != null) {
 
                     String fileName =
-                        
StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController
-                            .getMessageStoreConfig().getStorePathRootDir());
+                            
StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController
+                                    
.getMessageStoreConfig().getStorePathRootDir());
                     try {
                         MixAll.string2File(delayOffset, fileName);
                         
this.brokerController.getScheduleMessageService().load();
@@ -141,21 +166,21 @@ public class SlaveSynchronize {
 
     private void syncSubscriptionGroupConfig() {
         String masterAddrBak = this.masterAddr;
-        if (masterAddrBak != null  && 
!masterAddrBak.equals(brokerController.getBrokerAddr())) {
+        if (masterAddrBak != null && 
!masterAddrBak.equals(brokerController.getBrokerAddr())) {
             try {
                 SubscriptionGroupWrapper subscriptionWrapper =
-                    this.brokerController.getBrokerOuterAPI()
-                        .getAllSubscriptionGroupConfig(masterAddrBak);
+                        this.brokerController.getBrokerOuterAPI()
+                                .getAllSubscriptionGroupConfig(masterAddrBak);
 
                 if 
(!this.brokerController.getSubscriptionGroupManager().getDataVersion()
-                    .equals(subscriptionWrapper.getDataVersion())) {
+                        .equals(subscriptionWrapper.getDataVersion())) {
                     SubscriptionGroupManager subscriptionGroupManager =
-                        this.brokerController.getSubscriptionGroupManager();
+                            
this.brokerController.getSubscriptionGroupManager();
                     subscriptionGroupManager.getDataVersion().assignNewOne(
-                        subscriptionWrapper.getDataVersion());
+                            subscriptionWrapper.getDataVersion());
                     
subscriptionGroupManager.getSubscriptionGroupTable().clear();
                     
subscriptionGroupManager.getSubscriptionGroupTable().putAll(
-                        subscriptionWrapper.getSubscriptionGroupTable());
+                            subscriptionWrapper.getSubscriptionGroupTable());
                     subscriptionGroupManager.persist();
                     LOGGER.info("Update slave Subscription Group from master, 
{}", masterAddrBak);
                 }
@@ -167,7 +192,7 @@ public class SlaveSynchronize {
 
     private void syncMessageRequestMode() {
         String masterAddrBak = this.masterAddr;
-        if (masterAddrBak != null  && 
!masterAddrBak.equals(brokerController.getBrokerAddr())) {
+        if (masterAddrBak != null && 
!masterAddrBak.equals(brokerController.getBrokerAddr())) {
             try {
                 MessageRequestModeSerializeWrapper 
messageRequestModeSerializeWrapper =
                         
this.brokerController.getBrokerOuterAPI().getAllMessageRequestMode(masterAddrBak);
@@ -207,7 +232,7 @@ public class SlaveSynchronize {
             try {
                 if (null != 
brokerController.getMessageStore().getTimerMessageStore()) {
                     TimerMetrics.TimerMetricsSerializeWrapper 
metricsSerializeWrapper =
-                        
this.brokerController.getBrokerOuterAPI().getTimerMetrics(masterAddrBak);
+                            
this.brokerController.getBrokerOuterAPI().getTimerMetrics(masterAddrBak);
                     if 
(!brokerController.getMessageStore().getTimerMessageStore().getTimerMetrics().getDataVersion().equals(metricsSerializeWrapper.getDataVersion()))
 {
                         
this.brokerController.getMessageStore().getTimerMessageStore().getTimerMetrics().getDataVersion().assignNewOne(metricsSerializeWrapper.getDataVersion());
                         
this.brokerController.getMessageStore().getTimerMessageStore().getTimerMetrics().getTimingCount().clear();

Reply via email to