This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 20dc5c96e [ISSUE #6316] Nameserver should choose a master with a 
larger epoch when there are two masters in controller mode (#6317)
20dc5c96e is described below

commit 20dc5c96eacaa831b81e9cb038a6e5e33f81ec19
Author: rongtong <[email protected]>
AuthorDate: Mon Mar 13 11:16:35 2023 +0800

    [ISSUE #6316] Nameserver should choose a master with a larger epoch when 
there are two masters in controller mode (#6317)
---
 .../org/apache/rocketmq/broker/BrokerController.java    |  2 --
 .../rocketmq/broker/controller/ReplicasManager.java     |  4 ++++
 .../rocketmq/broker/processor/AdminBrokerProcessor.java |  3 ++-
 .../broker/subscription/SubscriptionGroupManager.java   |  3 ++-
 .../rocketmq/broker/topic/TopicConfigManager.java       | 17 ++++++++---------
 .../org/apache/rocketmq/store/DefaultMessageStore.java  | 10 +++++++++-
 .../store/ha/autoswitch/AutoSwitchHAService.java        |  4 +++-
 7 files changed, 28 insertions(+), 15 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index edaac446f..6a3e402ab 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1548,8 +1548,6 @@ public class BrokerController {
             this.brokerPreOnlineService.start();
         }
 
-        //Init state version after messageStore initialized.
-        this.topicConfigManager.initStateVersion();
     }
 
     public void start() throws Exception {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index 677faca02..3ce0c90ef 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -207,6 +207,8 @@ public class ReplicasManager {
 
                 schedulingCheckSyncStateSet();
 
+                
this.brokerController.getTopicConfigManager().getDataVersion().nextVersion(newMasterEpoch);
+
                 this.executorService.submit(() -> {
                     // Register broker to name-srv
                     try {
@@ -243,6 +245,8 @@ public class ReplicasManager {
                 // Notify ha service, change to slave
                 this.haService.changeToSlave(newMasterAddress, newMasterEpoch, 
this.brokerConfig.getBrokerId());
 
+                
this.brokerController.getTopicConfigManager().getDataVersion().nextVersion(newMasterEpoch);
+
                 this.executorService.submit(() -> {
                     // Register broker to name-srv
                     try {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 17e1e86c9..be98cb3e4 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -777,7 +777,8 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
                     LOGGER.info("updateBrokerConfig, new config: [{}] client: 
{} ", properties, ctx.channel().remoteAddress());
                     
this.brokerController.getConfiguration().update(properties);
                     if (properties.containsKey("brokerPermission")) {
-                        
this.brokerController.getTopicConfigManager().getDataVersion().nextVersion();
+                        long stateMachineVersion = 
brokerController.getMessageStore() != null ? 
brokerController.getMessageStore().getStateMachineVersion() : 0;
+                        
this.brokerController.getTopicConfigManager().getDataVersion().nextVersion(stateMachineVersion);
                         this.brokerController.registerBrokerAll(false, false, 
true);
                     }
                 } else {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index 779f5776a..808f37058 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -195,7 +195,8 @@ public class SubscriptionGroupManager extends ConfigManager 
{
             log.info("set group forbidden, {}@{} old: {} new: {}", group, 
topic, 0, forbidden);
         }
 
-        this.dataVersion.nextVersion();
+        long stateMachineVersion = brokerController.getMessageStore() != null 
? brokerController.getMessageStore().getStateMachineVersion() : 0;
+        dataVersion.nextVersion(stateMachineVersion);
 
         this.persist();
     }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 0e5d5370d..16140d4cd 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -294,7 +294,8 @@ public class TopicConfigManager extends ConfigManager {
                     }
                     log.info("Create new topic [{}] config:[{}]", 
topicConfig.getTopicName(), topicConfig);
                     this.topicConfigTable.put(topicConfig.getTopicName(), 
topicConfig);
-                    this.dataVersion.nextVersion();
+                    long stateMachineVersion = 
brokerController.getMessageStore() != null ? 
brokerController.getMessageStore().getStateMachineVersion() : 0;
+                    dataVersion.nextVersion(stateMachineVersion);
                     createNew = true;
                     this.persist();
                 } finally {
@@ -394,7 +395,8 @@ public class TopicConfigManager extends ConfigManager {
                     log.info("create new topic {}", topicConfig);
                     
this.topicConfigTable.put(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC, 
topicConfig);
                     createNew = true;
-                    this.dataVersion.nextVersion();
+                    long stateMachineVersion = 
brokerController.getMessageStore() != null ? 
brokerController.getMessageStore().getStateMachineVersion() : 0;
+                    dataVersion.nextVersion(stateMachineVersion);
                     this.persist();
                 } finally {
                     this.topicConfigTableLock.unlock();
@@ -540,7 +542,8 @@ public class TopicConfigManager extends ConfigManager {
         TopicConfig old = this.topicConfigTable.remove(topic);
         if (old != null) {
             log.info("delete topic config OK, topic: {}", old);
-            this.dataVersion.nextVersion();
+            long stateMachineVersion = brokerController.getMessageStore() != 
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
+            dataVersion.nextVersion(stateMachineVersion);
             this.persist();
         } else {
             log.warn("delete topic config failed, topic: {} not exists", 
topic);
@@ -556,12 +559,6 @@ public class TopicConfigManager extends ConfigManager {
         return topicConfigSerializeWrapper;
     }
 
-    public void initStateVersion() {
-        long stateMachineVersion = brokerController.getMessageStore() != null 
? brokerController.getMessageStore().getStateMachineVersion() : 0;
-        dataVersion.nextVersion(stateMachineVersion);
-        this.persist();
-    }
-
     @Override
     public String encode() {
         return encode(false);
@@ -734,4 +731,6 @@ public class TopicConfigManager extends ConfigManager {
     public boolean containsTopic(String topic) {
         return topicConfigTable.containsKey(topic);
     }
+
+
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 90493bd1b..1d0cdad14 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -198,6 +198,8 @@ public class DefaultMessageStore implements MessageStore {
 
     private final DispatchRequestOrderlyQueue dispatchRequestOrderlyQueue = 
new DispatchRequestOrderlyQueue(dispatchRequestOrderlyQueueSize);
 
+    private long stateMachineVersion = 0L;
+
     public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, 
final BrokerStatsManager brokerStatsManager,
         final MessageArrivingListener messageArrivingListener, final 
BrokerConfig brokerConfig) throws IOException {
         this.messageArrivingListener = messageArrivingListener;
@@ -1909,7 +1911,11 @@ public class DefaultMessageStore implements MessageStore 
{
 
     @Override
     public long getStateMachineVersion() {
-        return 0L;
+        return stateMachineVersion;
+    }
+
+    public void setStateMachineVersion(long stateMachineVersion) {
+        this.stateMachineVersion = stateMachineVersion;
     }
 
     public BrokerStatsManager getBrokerStatsManager() {
@@ -3215,4 +3221,6 @@ public class DefaultMessageStore implements MessageStore {
         return this.messageStoreConfig.isTransientStorePoolEnable() &&
             (this.brokerConfig.isEnableControllerMode() || 
this.messageStoreConfig.getBrokerRole() != BrokerRole.SLAVE);
     }
+
+
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 7382587dc..325341c66 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -147,8 +147,8 @@ public class AutoSwitchHAService extends DefaultHAService {
         }
 
         LOGGER.info("TruncateOffset is {}, confirmOffset is {}, maxPhyOffset 
is {}", truncateOffset, getConfirmOffset(), 
this.defaultMessageStore.getMaxPhyOffset());
-
         this.defaultMessageStore.recoverTopicQueueTable();
+        this.defaultMessageStore.setStateMachineVersion(masterEpoch);
         LOGGER.info("Change ha to master success, newMasterEpoch:{}, 
startOffset:{}", masterEpoch, newEpochEntry.getStartOffset());
         return true;
     }
@@ -178,6 +178,8 @@ public class AutoSwitchHAService extends DefaultHAService {
                 
defaultMessageStore.getTransientStorePool().setRealCommit(false);
             }
 
+            this.defaultMessageStore.setStateMachineVersion(newMasterEpoch);
+
             LOGGER.info("Change ha to slave success, newMasterAddress:{}, 
newMasterEpoch:{}", newMasterAddr, newMasterEpoch);
             return true;
         } catch (final Exception e) {

Reply via email to