This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 20dc5c96e [ISSUE #6316] Nameserver should choose a master with a
larger epoch when there are two masters in controller mode (#6317)
20dc5c96e is described below
commit 20dc5c96eacaa831b81e9cb038a6e5e33f81ec19
Author: rongtong <[email protected]>
AuthorDate: Mon Mar 13 11:16:35 2023 +0800
[ISSUE #6316] Nameserver should choose a master with a larger epoch when
there are two masters in controller mode (#6317)
---
.../org/apache/rocketmq/broker/BrokerController.java | 2 --
.../rocketmq/broker/controller/ReplicasManager.java | 4 ++++
.../rocketmq/broker/processor/AdminBrokerProcessor.java | 3 ++-
.../broker/subscription/SubscriptionGroupManager.java | 3 ++-
.../rocketmq/broker/topic/TopicConfigManager.java | 17 ++++++++---------
.../org/apache/rocketmq/store/DefaultMessageStore.java | 10 +++++++++-
.../store/ha/autoswitch/AutoSwitchHAService.java | 4 +++-
7 files changed, 28 insertions(+), 15 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index edaac446f..6a3e402ab 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1548,8 +1548,6 @@ public class BrokerController {
this.brokerPreOnlineService.start();
}
- //Init state version after messageStore initialized.
- this.topicConfigManager.initStateVersion();
}
public void start() throws Exception {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index 677faca02..3ce0c90ef 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -207,6 +207,8 @@ public class ReplicasManager {
schedulingCheckSyncStateSet();
+
this.brokerController.getTopicConfigManager().getDataVersion().nextVersion(newMasterEpoch);
+
this.executorService.submit(() -> {
// Register broker to name-srv
try {
@@ -243,6 +245,8 @@ public class ReplicasManager {
// Notify ha service, change to slave
this.haService.changeToSlave(newMasterAddress, newMasterEpoch,
this.brokerConfig.getBrokerId());
+
this.brokerController.getTopicConfigManager().getDataVersion().nextVersion(newMasterEpoch);
+
this.executorService.submit(() -> {
// Register broker to name-srv
try {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 17e1e86c9..be98cb3e4 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -777,7 +777,8 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
LOGGER.info("updateBrokerConfig, new config: [{}] client:
{} ", properties, ctx.channel().remoteAddress());
this.brokerController.getConfiguration().update(properties);
if (properties.containsKey("brokerPermission")) {
-
this.brokerController.getTopicConfigManager().getDataVersion().nextVersion();
+ long stateMachineVersion =
brokerController.getMessageStore() != null ?
brokerController.getMessageStore().getStateMachineVersion() : 0;
+
this.brokerController.getTopicConfigManager().getDataVersion().nextVersion(stateMachineVersion);
this.brokerController.registerBrokerAll(false, false,
true);
}
} else {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index 779f5776a..808f37058 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -195,7 +195,8 @@ public class SubscriptionGroupManager extends ConfigManager
{
log.info("set group forbidden, {}@{} old: {} new: {}", group,
topic, 0, forbidden);
}
- this.dataVersion.nextVersion();
+ long stateMachineVersion = brokerController.getMessageStore() != null
? brokerController.getMessageStore().getStateMachineVersion() : 0;
+ dataVersion.nextVersion(stateMachineVersion);
this.persist();
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 0e5d5370d..16140d4cd 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -294,7 +294,8 @@ public class TopicConfigManager extends ConfigManager {
}
log.info("Create new topic [{}] config:[{}]",
topicConfig.getTopicName(), topicConfig);
this.topicConfigTable.put(topicConfig.getTopicName(),
topicConfig);
- this.dataVersion.nextVersion();
+ long stateMachineVersion =
brokerController.getMessageStore() != null ?
brokerController.getMessageStore().getStateMachineVersion() : 0;
+ dataVersion.nextVersion(stateMachineVersion);
createNew = true;
this.persist();
} finally {
@@ -394,7 +395,8 @@ public class TopicConfigManager extends ConfigManager {
log.info("create new topic {}", topicConfig);
this.topicConfigTable.put(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC,
topicConfig);
createNew = true;
- this.dataVersion.nextVersion();
+ long stateMachineVersion =
brokerController.getMessageStore() != null ?
brokerController.getMessageStore().getStateMachineVersion() : 0;
+ dataVersion.nextVersion(stateMachineVersion);
this.persist();
} finally {
this.topicConfigTableLock.unlock();
@@ -540,7 +542,8 @@ public class TopicConfigManager extends ConfigManager {
TopicConfig old = this.topicConfigTable.remove(topic);
if (old != null) {
log.info("delete topic config OK, topic: {}", old);
- this.dataVersion.nextVersion();
+ long stateMachineVersion = brokerController.getMessageStore() !=
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
+ dataVersion.nextVersion(stateMachineVersion);
this.persist();
} else {
log.warn("delete topic config failed, topic: {} not exists",
topic);
@@ -556,12 +559,6 @@ public class TopicConfigManager extends ConfigManager {
return topicConfigSerializeWrapper;
}
- public void initStateVersion() {
- long stateMachineVersion = brokerController.getMessageStore() != null
? brokerController.getMessageStore().getStateMachineVersion() : 0;
- dataVersion.nextVersion(stateMachineVersion);
- this.persist();
- }
-
@Override
public String encode() {
return encode(false);
@@ -734,4 +731,6 @@ public class TopicConfigManager extends ConfigManager {
public boolean containsTopic(String topic) {
return topicConfigTable.containsKey(topic);
}
+
+
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 90493bd1b..1d0cdad14 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -198,6 +198,8 @@ public class DefaultMessageStore implements MessageStore {
private final DispatchRequestOrderlyQueue dispatchRequestOrderlyQueue =
new DispatchRequestOrderlyQueue(dispatchRequestOrderlyQueueSize);
+ private long stateMachineVersion = 0L;
+
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig,
final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final
BrokerConfig brokerConfig) throws IOException {
this.messageArrivingListener = messageArrivingListener;
@@ -1909,7 +1911,11 @@ public class DefaultMessageStore implements MessageStore
{
@Override
public long getStateMachineVersion() {
- return 0L;
+ return stateMachineVersion;
+ }
+
+ public void setStateMachineVersion(long stateMachineVersion) {
+ this.stateMachineVersion = stateMachineVersion;
}
public BrokerStatsManager getBrokerStatsManager() {
@@ -3215,4 +3221,6 @@ public class DefaultMessageStore implements MessageStore {
return this.messageStoreConfig.isTransientStorePoolEnable() &&
(this.brokerConfig.isEnableControllerMode() ||
this.messageStoreConfig.getBrokerRole() != BrokerRole.SLAVE);
}
+
+
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 7382587dc..325341c66 100644
---
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -147,8 +147,8 @@ public class AutoSwitchHAService extends DefaultHAService {
}
LOGGER.info("TruncateOffset is {}, confirmOffset is {}, maxPhyOffset
is {}", truncateOffset, getConfirmOffset(),
this.defaultMessageStore.getMaxPhyOffset());
-
this.defaultMessageStore.recoverTopicQueueTable();
+ this.defaultMessageStore.setStateMachineVersion(masterEpoch);
LOGGER.info("Change ha to master success, newMasterEpoch:{},
startOffset:{}", masterEpoch, newEpochEntry.getStartOffset());
return true;
}
@@ -178,6 +178,8 @@ public class AutoSwitchHAService extends DefaultHAService {
defaultMessageStore.getTransientStorePool().setRealCommit(false);
}
+ this.defaultMessageStore.setStateMachineVersion(newMasterEpoch);
+
LOGGER.info("Change ha to slave success, newMasterAddress:{},
newMasterEpoch:{}", newMasterAddr, newMasterEpoch);
return true;
} catch (final Exception e) {