This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new c05a0b222 [ISSUE #6377] Polish the code when broker change to master
(#6378)
c05a0b222 is described below
commit c05a0b222c5d5593daffc984b6835022bd193652
Author: rongtong <[email protected]>
AuthorDate: Mon Mar 20 09:52:53 2023 +0800
[ISSUE #6377] Polish the code when broker change to master (#6378)
* Polish the code when broker change to master
* Pass the check style
* Polish the log
---
.../broker/controller/ReplicasManager.java | 89 +++++++++++++---------
1 file changed, 51 insertions(+), 38 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index c83c823c0..5bdd1dbe9 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -110,7 +110,7 @@ public class ReplicasManager {
this.scheduledService = Executors.newScheduledThreadPool(3, new
ThreadFactoryImpl("ReplicasManager_ScheduledService_",
brokerController.getBrokerIdentity()));
this.executorService = Executors.newFixedThreadPool(3, new
ThreadFactoryImpl("ReplicasManager_ExecutorService_",
brokerController.getBrokerIdentity()));
this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(32), new
ThreadFactoryImpl("ReplicasManager_scan_thread_",
brokerController.getBrokerIdentity()));
+ new ArrayBlockingQueue<>(32), new
ThreadFactoryImpl("ReplicasManager_scan_thread_",
brokerController.getBrokerIdentity()));
this.haService = (AutoSwitchHAService)
brokerController.getMessageStore().getHaService();
this.brokerConfig = brokerController.getBrokerConfig();
this.availableControllerAddresses = new ConcurrentHashMap<>();
@@ -166,7 +166,8 @@ public class ReplicasManager {
}
private boolean startBasicService() {
- if (this.state == State.SHUTDOWN) return false;
+ if (this.state == State.SHUTDOWN)
+ return false;
if (this.state == State.INITIAL) {
if (schedulingSyncControllerMetadata()) {
this.state = State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE;
@@ -226,8 +227,9 @@ public class ReplicasManager {
this.scanExecutor.shutdownNow();
}
- public synchronized void changeBrokerRole(final Long newMasterBrokerId,
final String newMasterAddress, final Integer newMasterEpoch,
- final Integer syncStateSetEpoch,
final Set<Long> syncStateSet) {
+ public synchronized void changeBrokerRole(final Long newMasterBrokerId,
final String newMasterAddress,
+ final Integer newMasterEpoch,
+ final Integer syncStateSetEpoch, final Set<Long> syncStateSet) {
if (newMasterBrokerId != null && newMasterEpoch > this.masterEpoch) {
if (newMasterBrokerId.equals(this.brokerControllerId)) {
changeToMaster(newMasterEpoch, syncStateSetEpoch,
syncStateSet);
@@ -244,9 +246,13 @@ public class ReplicasManager {
this.masterEpoch = newMasterEpoch;
if (this.masterBrokerId != null &&
this.masterBrokerId.equals(this.brokerControllerId) &&
this.brokerController.getBrokerConfig().getBrokerId() == MixAll.MASTER_ID) {
+ // Change SyncStateSet
+ final HashSet<Long> newSyncStateSet = new
HashSet<>(syncStateSet);
+ changeSyncStateSet(newSyncStateSet, syncStateSetEpoch);
// if master doesn't change
this.haService.changeToMasterWhenLastRoleIsMaster(newMasterEpoch);
this.brokerController.getTopicConfigManager().getDataVersion().nextVersion(newMasterEpoch);
+
this.executorService.submit(this::checkSyncStateSetAndDoReport);
registerBrokerWhenRoleChange();
return;
}
@@ -272,6 +278,7 @@ public class ReplicasManager {
schedulingCheckSyncStateSet();
this.brokerController.getTopicConfigManager().getDataVersion().nextVersion(newMasterEpoch);
+
this.executorService.submit(this::checkSyncStateSetAndDoReport);
registerBrokerWhenRoleChange();
}
}
@@ -281,7 +288,7 @@ public class ReplicasManager {
synchronized (this) {
if (newMasterEpoch > this.masterEpoch) {
LOGGER.info("Begin to change to slave, brokerName={},
brokerId={}, newMasterBrokerId={}, newMasterAddress={}, newMasterEpoch={}",
- this.brokerConfig.getBrokerName(),
this.brokerControllerId, newMasterBrokerId, newMasterAddress, newMasterEpoch);
+ this.brokerConfig.getBrokerName(),
this.brokerControllerId, newMasterBrokerId, newMasterAddress, newMasterEpoch);
this.masterEpoch = newMasterEpoch;
if (newMasterBrokerId.equals(this.masterBrokerId)) {
@@ -318,19 +325,19 @@ public class ReplicasManager {
}
public void registerBrokerWhenRoleChange() {
- String currentRole =
this.brokerController.getMessageStoreConfig().getBrokerRole().equals(BrokerRole.SLAVE)
? "slave" : "master";
this.executorService.submit(() -> {
// Register broker to name-srv
try {
this.brokerController.registerBrokerAll(true, false,
this.brokerController.getBrokerConfig().isForceRegister());
} catch (final Throwable e) {
- LOGGER.error("Error happen when register broker to name-srv,
Failed to change broker to {}", currentRole, e);
+ LOGGER.error("Error happen when register broker to name-srv,
Failed to change broker to {}",
this.brokerController.getMessageStoreConfig().getBrokerRole(), e);
return;
}
LOGGER.info("Change broker [id:{}][address:{}] to {},
newMasterBrokerId:{}, newMasterAddress:{}, newMasterEpoch:{},
syncStateSetEpoch:{}",
- this.brokerControllerId, this.brokerAddress, currentRole,
this.masterBrokerId, this.masterAddress, this.masterEpoch,
this.syncStateSetEpoch);
+ this.brokerControllerId, this.brokerAddress,
this.brokerController.getMessageStoreConfig().getBrokerRole(),
this.masterBrokerId, this.masterAddress, this.masterEpoch,
this.syncStateSetEpoch);
});
+
}
private void changeSyncStateSet(final Set<Long> newSyncStateSet, final int
newSyncStateSetEpoch) {
@@ -375,7 +382,7 @@ public class ReplicasManager {
// Broker try to elect itself as a master in broker set.
try {
Pair<ElectMasterResponseHeader, Set<Long>> tryElectResponsePair =
this.brokerOuterAPI.brokerElect(this.controllerLeaderAddress,
this.brokerConfig.getBrokerClusterName(),
- this.brokerConfig.getBrokerName(),
this.brokerControllerId);
+ this.brokerConfig.getBrokerName(), this.brokerControllerId);
ElectMasterResponseHeader tryElectResponse =
tryElectResponsePair.getObject1();
Set<Long> syncStateSet = tryElectResponsePair.getObject2();
final String masterAddress = tryElectResponse.getMasterAddress();
@@ -402,17 +409,17 @@ public class ReplicasManager {
for (String controllerAddress : controllerAddresses) {
if (StringUtils.isNotEmpty(controllerAddress)) {
this.brokerOuterAPI.sendHeartbeatToController(
- controllerAddress,
- this.brokerConfig.getBrokerClusterName(),
- this.brokerAddress,
- this.brokerConfig.getBrokerName(),
- this.brokerControllerId,
- this.brokerConfig.getSendHeartbeatTimeoutMillis(),
- this.brokerConfig.isInBrokerContainer(),
this.getLastEpoch(),
-
this.brokerController.getMessageStore().getMaxPhyOffset(),
- this.getConfirmOffset(),
- this.brokerConfig.getControllerHeartBeatTimeoutMills(),
- this.brokerConfig.getBrokerElectionPriority()
+ controllerAddress,
+ this.brokerConfig.getBrokerClusterName(),
+ this.brokerAddress,
+ this.brokerConfig.getBrokerName(),
+ this.brokerControllerId,
+ this.brokerConfig.getSendHeartbeatTimeoutMillis(),
+ this.brokerConfig.isInBrokerContainer(),
this.getLastEpoch(),
+ this.brokerController.getMessageStore().getMaxPhyOffset(),
+ this.getConfirmOffset(),
+ this.brokerConfig.getControllerHeartBeatTimeoutMills(),
+ this.brokerConfig.getBrokerElectionPriority()
);
}
}
@@ -520,7 +527,7 @@ public class ReplicasManager {
private boolean applyBrokerId() {
try {
ApplyBrokerIdResponseHeader response =
this.brokerOuterAPI.applyBrokerId(brokerConfig.getBrokerClusterName(),
brokerConfig.getBrokerName(),
- tempBrokerMetadata.getBrokerId(),
tempBrokerMetadata.getRegisterCheckCode(), this.controllerLeaderAddress);
+ tempBrokerMetadata.getBrokerId(),
tempBrokerMetadata.getRegisterCheckCode(), this.controllerLeaderAddress);
return true;
} catch (Exception e) {
@@ -549,14 +556,16 @@ public class ReplicasManager {
}
/**
- * Send registerBrokerToController request to inform controller that now
broker has been registered successfully and controller should update broker
ipAddress if changed
+ * Send registerBrokerToController request to inform controller that now
broker has been registered successfully and
+ * controller should update broker ipAddress if changed
*
* @return whether request success
*/
private boolean registerBrokerToController() {
try {
Pair<RegisterBrokerToControllerResponseHeader, Set<Long>>
responsePair =
this.brokerOuterAPI.registerBrokerToController(brokerConfig.getBrokerClusterName(),
brokerConfig.getBrokerName(), brokerControllerId, brokerAddress,
controllerLeaderAddress);
- if (responsePair == null) return false;
+ if (responsePair == null)
+ return false;
RegisterBrokerToControllerResponseHeader response =
responsePair.getObject1();
Set<Long> syncStateSet = responsePair.getObject2();
final Long masterBrokerId = response.getMasterBrokerId();
@@ -606,24 +615,24 @@ public class ReplicasManager {
if (this.registerState ==
RegisterState.CREATE_TEMP_METADATA_FILE_DONE) {
if (this.tempBrokerMetadata.getClusterName() == null ||
!this.tempBrokerMetadata.getClusterName().equals(this.brokerConfig.getBrokerClusterName()))
{
LOGGER.error("The clusterName: {} in broker temp metadata is
different from the clusterName: {} in broker config",
- this.tempBrokerMetadata.getClusterName(),
this.brokerConfig.getBrokerClusterName());
+ this.tempBrokerMetadata.getClusterName(),
this.brokerConfig.getBrokerClusterName());
return false;
}
if (this.tempBrokerMetadata.getBrokerName() == null ||
!this.tempBrokerMetadata.getBrokerName().equals(this.brokerConfig.getBrokerName()))
{
LOGGER.error("The brokerName: {} in broker temp metadata is
different from the brokerName: {} in broker config",
- this.tempBrokerMetadata.getBrokerName(),
this.brokerConfig.getBrokerName());
+ this.tempBrokerMetadata.getBrokerName(),
this.brokerConfig.getBrokerName());
return false;
}
}
if (this.registerState == RegisterState.CREATE_METADATA_FILE_DONE) {
if (this.brokerMetadata.getClusterName() == null ||
!this.brokerMetadata.getClusterName().equals(this.brokerConfig.getBrokerClusterName()))
{
LOGGER.error("The clusterName: {} in broker metadata is
different from the clusterName: {} in broker config",
- this.brokerMetadata.getClusterName(),
this.brokerConfig.getBrokerClusterName());
+ this.brokerMetadata.getClusterName(),
this.brokerConfig.getBrokerClusterName());
return false;
}
if (this.brokerMetadata.getBrokerName() == null ||
!this.brokerMetadata.getBrokerName().equals(this.brokerConfig.getBrokerName()))
{
LOGGER.error("The brokerName: {} in broker metadata is
different from the brokerName: {} in broker config",
- this.brokerMetadata.getBrokerName(),
this.brokerConfig.getBrokerName());
+ this.brokerMetadata.getBrokerName(),
this.brokerConfig.getBrokerName());
return false;
}
}
@@ -730,18 +739,22 @@ public class ReplicasManager {
this.checkSyncStateSetTaskFuture.cancel(false);
}
this.checkSyncStateSetTaskFuture =
this.scheduledService.scheduleAtFixedRate(() -> {
- final Set<Long> newSyncStateSet =
this.haService.maybeShrinkSyncStateSet();
- newSyncStateSet.add(this.brokerControllerId);
- synchronized (this) {
- if (this.syncStateSet != null) {
- // Check if syncStateSet changed
- if (this.syncStateSet.size() == newSyncStateSet.size() &&
this.syncStateSet.containsAll(newSyncStateSet)) {
- return;
- }
+ checkSyncStateSetAndDoReport();
+ }, 3 * 1000, this.brokerConfig.getCheckSyncStateSetPeriod(),
TimeUnit.MILLISECONDS);
+ }
+
+ private void checkSyncStateSetAndDoReport() {
+ final Set<Long> newSyncStateSet =
this.haService.maybeShrinkSyncStateSet();
+ newSyncStateSet.add(this.brokerControllerId);
+ synchronized (this) {
+ if (this.syncStateSet != null) {
+ // Check if syncStateSet changed
+ if (this.syncStateSet.size() == newSyncStateSet.size() &&
this.syncStateSet.containsAll(newSyncStateSet)) {
+ return;
}
}
- doReportSyncStateSetChanged(newSyncStateSet);
- }, 3 * 1000, this.brokerConfig.getCheckSyncStateSetPeriod(),
TimeUnit.MILLISECONDS);
+ }
+ doReportSyncStateSetChanged(newSyncStateSet);
}
private void doReportSyncStateSetChanged(Set<Long> newSyncStateSet) {
@@ -752,7 +765,7 @@ public class ReplicasManager {
}
} catch (final Exception e) {
LOGGER.error("Error happen when change SyncStateSet, broker:{},
masterAddress:{}, masterEpoch:{}, oldSyncStateSet:{}, newSyncStateSet:{},
syncStateSetEpoch:{}",
- this.brokerConfig.getBrokerName(), this.masterAddress,
this.masterEpoch, this.syncStateSet, newSyncStateSet, this.syncStateSetEpoch,
e);
+ this.brokerConfig.getBrokerName(), this.masterAddress,
this.masterEpoch, this.syncStateSet, newSyncStateSet, this.syncStateSetEpoch,
e);
}
}