This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller
by this push:
new f905da411 [Summer of code] Let broker send heartbeat to controller
(#4341)
f905da411 is described below
commit f905da411a71eecba9f896db52227a7b3081b6a6
Author: hzh0425 <[email protected]>
AuthorDate: Fri May 20 17:35:07 2022 +0800
[Summer of code] Let broker send heartbeat to controller (#4341)
* let broker send heartbeat to controller
* code remview
* code review
* fix bug
* add state in replicasmanager
* add brokerId when getReplicasInfo
* code review
---
.../apache/rocketmq/broker/BrokerController.java | 72 +++++++++++++++-------
.../broker/hacontroller/ReplicasManager.java | 70 +++++++++++++++++----
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 39 +++++++++++-
.../org/apache/rocketmq/common/BrokerConfig.java | 50 ++++++++++++---
.../common/protocol/body/SyncStateSet.java | 2 +-
.../controller/GetReplicaInfoRequestHeader.java | 18 +++++-
.../controller/GetReplicaInfoResponseHeader.java | 11 ++++
.../rocketmq/container/InnerBrokerController.java | 2 +-
.../org/apache/rocketmq/controller/Controller.java | 5 ++
.../rocketmq/controller/ControllerManager.java | 18 +++---
.../controller/impl/DledgerController.java | 5 ++
.../controller/impl/manager/BrokerInfo.java | 5 +-
.../impl/manager/ReplicasInfoManager.java | 5 ++
.../apache/rocketmq/store/DefaultMessageStore.java | 4 +-
.../rocketmq/store/config/MessageStoreConfig.java | 14 -----
.../store/ha/autoswitch/AutoSwitchHATest.java | 4 +-
.../test/autoswitchrole/AutoSwitchRoleBase.java | 2 +-
.../AutoSwitchRoleIntegrationTest.java | 2 +-
18 files changed, 249 insertions(+), 79 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 2aa307343..cb2a4cc32 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -42,6 +42,7 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
@@ -603,7 +604,7 @@ public class BrokerController {
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
- if (!messageStoreConfig.isEnableDLegerCommitLog() &&
!messageStoreConfig.isDuplicationEnable() &&
!messageStoreConfig.isStartupControllerMode()) {
+ if (!messageStoreConfig.isEnableDLegerCommitLog() &&
!messageStoreConfig.isDuplicationEnable() &&
!brokerConfig.isStartupControllerMode()) {
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null &&
this.messageStoreConfig.getHaMasterAddress().length() >= HA_ADDRESS_MIN_LENGTH)
{
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
@@ -639,7 +640,7 @@ public class BrokerController {
}
}
- if (this.messageStoreConfig.isStartupControllerMode()) {
+ if (this.brokerConfig.isStartupControllerMode()) {
this.updateMasterHAServerAddrPeriodically = true;
}
}
@@ -711,7 +712,7 @@ public class BrokerController {
MessageStorePluginContext context = new
MessageStorePluginContext(this, messageStoreConfig, brokerStatsManager,
messageArrivingListener);
this.messageStore = MessageStoreFactory.build(context,
defaultMessageStore);
this.messageStore.getDispatcherList().addFirst(new
CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
- if (this.messageStoreConfig.isStartupControllerMode()) {
+ if (this.brokerConfig.isStartupControllerMode()) {
this.replicasManager = new ReplicasManager(this);
}
} catch (IOException e) {
@@ -1452,7 +1453,7 @@ public class BrokerController {
startBasicService();
- if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog()
&& !this.messageStoreConfig.isDuplicationEnable() &&
!this.messageStoreConfig.isStartupControllerMode()) {
+ if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog()
&& !this.messageStoreConfig.isDuplicationEnable() &&
!this.brokerConfig.isStartupControllerMode()) {
changeSpecialServiceStatus(this.brokerConfig.getBrokerId() ==
MixAll.MASTER_ID);
this.registerBrokerAll(true, false, true);
}
@@ -1490,7 +1491,7 @@ public class BrokerController {
}, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(),
TimeUnit.MILLISECONDS));
}
- if (this.messageStoreConfig.isStartupControllerMode()) {
+ if (this.brokerConfig.isStartupControllerMode()) {
scheduleSendHeartbeat();
}
@@ -1600,6 +1601,10 @@ public class BrokerController {
BrokerController.LOG.info("BrokerController#doResterBrokerAll:
broker has shutdown, no need to register any more.");
return;
}
+ Long heartbeatTimeoutMillis =
(this.brokerConfig.isEnableSlaveActingMaster() ||
+ (this.brokerConfig.isStartupControllerMode() &&
!this.brokerConfig.isControllerDeployedStandAlone())) ?
+ this.brokerConfig.getBrokerNotActiveTimeoutMillis() : null;
+
List<RegisterBrokerResult> registerBrokerResultList =
this.brokerOuterAPI.registerBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
@@ -1612,30 +1617,51 @@ public class BrokerController {
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isEnableSlaveActingMaster(),
this.brokerConfig.isCompressedRegister(),
- (this.brokerConfig.isEnableSlaveActingMaster() ||
this.messageStoreConfig.isStartupControllerMode()) ?
this.brokerConfig.getBrokerNotActiveTimeoutMillis() : null,
+ heartbeatTimeoutMillis,
this.getBrokerIdentity());
handleRegisterBrokerResult(registerBrokerResultList, checkOrderConfig);
}
protected void sendHeartbeat() {
- if (this.brokerConfig.isCompatibleWithOldNameSrv()) {
- this.brokerOuterAPI.sendHeartbeatViaDataVersion(
- this.brokerConfig.getBrokerClusterName(),
- this.getBrokerAddr(),
- this.brokerConfig.getBrokerName(),
- this.brokerConfig.getBrokerId(),
- this.brokerConfig.getSendHeartbeatTimeoutMillis(),
- this.getTopicConfigManager().getDataVersion(),
- this.brokerConfig.isInBrokerContainer());
- } else {
- this.brokerOuterAPI.sendHeartbeat(
- this.brokerConfig.getBrokerClusterName(),
- this.getBrokerAddr(),
- this.brokerConfig.getBrokerName(),
- this.brokerConfig.getBrokerId(),
- this.brokerConfig.getSendHeartbeatTimeoutMillis(),
- this.brokerConfig.isInBrokerContainer());
+ boolean shouldSendHeartbeatToController =
this.brokerConfig.isStartupControllerMode() &&
this.brokerConfig.isControllerDeployedStandAlone();
+ if (shouldSendHeartbeatToController) {
+ final List<String> controllerAddresses =
this.replicasManager.getControllerAddresses();
+ for (String controllerAddress : controllerAddresses) {
+ if (StringUtils.isNotEmpty(controllerAddress)) {
+ this.brokerOuterAPI.sendHeartbeatToController(
+ controllerAddress,
+ this.brokerConfig.getBrokerClusterName(),
+ this.getBrokerAddr(),
+ this.brokerConfig.getBrokerName(),
+ this.brokerConfig.getBrokerId(),
+ this.brokerConfig.getSendHeartbeatTimeoutMillis(),
+ this.brokerConfig.isInBrokerContainer()
+ );
+ }
+ }
+ }
+
+ boolean shouldSendHeartbeatToNameSrv =
this.brokerConfig.isEnableSlaveActingMaster() ||
!shouldSendHeartbeatToController;
+ if (shouldSendHeartbeatToNameSrv) {
+ if (this.brokerConfig.isCompatibleWithOldNameSrv()) {
+ this.brokerOuterAPI.sendHeartbeatViaDataVersion(
+ this.brokerConfig.getBrokerClusterName(),
+ this.getBrokerAddr(),
+ this.brokerConfig.getBrokerName(),
+ this.brokerConfig.getBrokerId(),
+ this.brokerConfig.getSendHeartbeatTimeoutMillis(),
+ this.getTopicConfigManager().getDataVersion(),
+ this.brokerConfig.isInBrokerContainer());
+ } else {
+ this.brokerOuterAPI.sendHeartbeat(
+ this.brokerConfig.getBrokerClusterName(),
+ this.getBrokerAddr(),
+ this.brokerConfig.getBrokerName(),
+ this.brokerConfig.getBrokerId(),
+ this.brokerConfig.getSendHeartbeatTimeoutMillis(),
+ this.brokerConfig.isInBrokerContainer());
+ }
}
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
index fd09239ab..b59995ae2 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
@@ -63,6 +63,7 @@ public class ReplicasManager {
private final List<String> controllerAddresses;
private volatile String controllerLeaderAddress = "";
+ private volatile State state = State.INITIAL;
private ScheduledFuture<?> checkSyncStateSetTaskFuture;
private ScheduledFuture<?> slaveSyncFuture;
@@ -76,7 +77,7 @@ public class ReplicasManager {
this.brokerController = brokerController;
this.brokerOuterAPI = brokerController.getBrokerOuterAPI();
this.scheduledService = Executors.newScheduledThreadPool(3, new
ThreadFactoryImpl("ReplicasManager_ScheduledService_",
brokerController.getBrokerIdentity()));
- this.executorService = Executors.newFixedThreadPool(2, new
ThreadFactoryImpl("ReplicasManager_ExecutorService_",
brokerController.getBrokerIdentity()));
+ this.executorService = Executors.newFixedThreadPool(3, new
ThreadFactoryImpl("ReplicasManager_ExecutorService_",
brokerController.getBrokerIdentity()));
this.haService = (AutoSwitchHAService)
brokerController.getMessageStore().getHaService();
this.brokerConfig = brokerController.getBrokerConfig();
final BrokerConfig brokerConfig = brokerController.getBrokerConfig();
@@ -89,19 +90,56 @@ public class ReplicasManager {
this.haService.setLocalAddress(this.localAddress);
}
+ enum State {
+ INITIAL,
+ FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE,
+ RUNNING,
+ }
+
public void start() {
- if (!schedulingSyncControllerMetadata()) {
- return;
+ if (!startBasicService()) {
+ LOGGER.error("Failed to start replicasManager");
+ this.executorService.submit(() -> {
+ int tryTimes = 1;
+ while (!startBasicService()) {
+ tryTimes++;
+ LOGGER.error("Failed to start replicasManager, try
times:{}, current state:{}, try it again", tryTimes, this.state);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ LOGGER.info("Start replicasManager success, try times:{}",
tryTimes);
+ });
+ }
+ }
+
+ private boolean startBasicService() {
+ if (this.state == State.INITIAL) {
+ if (schedulingSyncControllerMetadata()) {
+ LOGGER.info("First time sync controller metadata success");
+ this.state = State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE;
+ } else {
+ return false;
+ }
}
- if (!registerBroker()) {
- return;
+ if (this.state == State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE) {
+ if (registerBroker()) {
+ LOGGER.info("First time register broker success");
+ this.state = State.RUNNING;
+ } else {
+ return false;
+ }
}
schedulingSyncBrokerMetadata();
+ return true;
}
public void shutdown() {
+ this.state = State.INITIAL;
+ this.executorService.shutdown();
this.scheduledService.shutdown();
}
@@ -143,7 +181,7 @@ public class ReplicasManager {
}
}
- public void changeToSlave(final String newMasterAddress, final int
newMasterEpoch) {
+ public void changeToSlave(final String newMasterAddress, final int
newMasterEpoch, long brokerId) {
synchronized (this) {
if (newMasterEpoch > this.masterEpoch) {
LOGGER.info("Begin to change to slave, brokerName={},
replicas:{}, brokerId={}", this.brokerConfig.getBrokerName(),
this.localAddress, this.brokerConfig.getBrokerId());
@@ -156,6 +194,7 @@ public class ReplicasManager {
// Change config
this.brokerController.getMessageStoreConfig().setBrokerRole(BrokerRole.SLAVE);
this.brokerController.changeSpecialServiceStatus(false);
+ this.brokerConfig.setBrokerId(brokerId);
// Handle the slave synchronise
handleSlaveSynchronize(BrokerRole.SLAVE);
@@ -214,12 +253,11 @@ public class ReplicasManager {
try {
final BrokerRegisterResponseHeader registerResponse =
this.brokerOuterAPI.registerBroker(this.controllerLeaderAddress,
this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(),
this.localAddress);
final String newMasterAddress =
registerResponse.getMasterAddress();
- this.brokerConfig.setBrokerId(registerResponse.getBrokerId());
if (StringUtils.isNoneEmpty(newMasterAddress)) {
if (StringUtils.equals(newMasterAddress, this.localAddress)) {
changeToMaster(registerResponse.getMasterEpoch(),
registerResponse.getSyncStateSetEpoch());
} else {
- changeToSlave(newMasterAddress,
registerResponse.getMasterEpoch());
+ changeToSlave(newMasterAddress,
registerResponse.getMasterEpoch(), registerResponse.getBrokerId());
}
}
return true;
@@ -235,18 +273,24 @@ public class ReplicasManager {
private void schedulingSyncBrokerMetadata() {
this.scheduledService.scheduleAtFixedRate(() -> {
try {
- final Pair<GetReplicaInfoResponseHeader, SyncStateSet> result
= this.brokerOuterAPI.getReplicaInfo(this.controllerLeaderAddress,
this.brokerConfig.getBrokerName());
+ final Pair<GetReplicaInfoResponseHeader, SyncStateSet> result
= this.brokerOuterAPI.getReplicaInfo(this.controllerLeaderAddress,
this.brokerConfig.getBrokerName(), this.localAddress);
final GetReplicaInfoResponseHeader info = result.getObject1();
final SyncStateSet syncStateSet = result.getObject2();
final String newMasterAddress = info.getMasterAddress();
final int newMasterEpoch = info.getMasterEpoch();
+ final long brokerId = info.getBrokerId();
synchronized (this) {
// Check if master changed
if (StringUtils.isNoneEmpty(newMasterAddress) &&
!StringUtils.equals(this.masterAddress, newMasterAddress) && newMasterEpoch >
this.masterEpoch) {
if (StringUtils.equals(newMasterAddress,
this.localAddress)) {
changeToMaster(newMasterEpoch,
syncStateSet.getSyncStateSetEpoch());
} else {
- changeToSlave(newMasterAddress, newMasterEpoch);
+ if (brokerId > 0) {
+ changeToSlave(newMasterAddress,
newMasterEpoch, brokerId);
+ } else if (brokerId < 0) {
+ // If the brokerId is no existed, we should
try register again.
+ registerBroker();
+ }
}
} else {
// Check if sync state set changed
@@ -340,7 +384,7 @@ public class ReplicasManager {
}
public SyncStateSet getSyncStateSet() {
- return new SyncStateSet(new HashSet<>(this.syncStateSet),
this.syncStateSetEpoch);
+ return new SyncStateSet(this.syncStateSet, this.syncStateSetEpoch);
}
public String getLocalAddress() {
@@ -354,4 +398,8 @@ public class ReplicasManager {
public int getMasterEpoch() {
return masterEpoch;
}
+
+ public List<String> getControllerAddresses() {
+ return controllerAddresses;
+ }
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 60701913a..dbede0763 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.producer.SendResult;
@@ -1062,8 +1063,8 @@ public class BrokerOuterAPI {
* Get broker replica info
*/
public Pair<GetReplicaInfoResponseHeader, SyncStateSet>
getReplicaInfo(final String controllerAddress,
- final String brokerName) throws Exception {
- final GetReplicaInfoRequestHeader requestHeader = new
GetReplicaInfoRequestHeader(brokerName);
+ final String brokerName, final String brokerAddress) throws Exception {
+ final GetReplicaInfoRequestHeader requestHeader = new
GetReplicaInfoRequestHeader(brokerName, brokerAddress);
final RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_REPLICA_INFO,
requestHeader);
final RemotingCommand response =
this.remotingClient.invokeSync(controllerAddress, request, 3000);
assert response != null;
@@ -1080,4 +1081,38 @@ public class BrokerOuterAPI {
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
+
+ /**
+ * Send heartbeat to controller
+ */
+ public void sendHeartbeatToController(final String controllerAddress,
+ final String clusterName,
+ final String brokerAddr,
+ final String brokerName,
+ final Long brokerId,
+ final int timeoutMills,
+ final boolean isInBrokerContainer) {
+ if (StringUtils.isEmpty(controllerAddress)) {
+ return;
+ }
+
+ final BrokerHeartbeatRequestHeader requestHeader = new
BrokerHeartbeatRequestHeader();
+ requestHeader.setClusterName(clusterName);
+ requestHeader.setBrokerAddr(brokerAddr);
+ requestHeader.setBrokerName(brokerName);
+
+ brokerOuterExecutor.execute(new AbstractBrokerRunnable(new
BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) {
+ @Override
+ public void run2() {
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.BROKER_HEARTBEAT,
requestHeader);
+
+ try {
+
BrokerOuterAPI.this.remotingClient.invokeOneway(controllerAddress, request,
timeoutMills);
+ } catch (Exception e) {
+ LOGGER.error("Error happen when send heartbeat to
controller {}", controllerAddress, e);
+ }
+ }
+ });
+ }
+
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 898cd3fe0..26d05e1d7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -293,10 +293,24 @@ public class BrokerConfig extends BrokerIdentity {
*/
private boolean lockInStrictMode = false;
- private String controllerAddr = "";
-
private boolean compatibleWithOldNameSrv = true;
+ /**
+ * Is startup controller mode, which support auto switch broker's role.
+ */
+ private boolean startupControllerMode = false;
+
+ /**
+ * Whether the controller is deployed independently
+ */
+ private boolean isControllerDeployedStandAlone = false;
+
+ /**
+ * If isControllerDeployedStandAlone = false, controllerAddr should be the
addresses of all name-srv which running the controller instance.
+ * If isControllerDeployedStandAlone = true, controllerAddr should be
controller's address.
+ */
+ private String controllerAddr = "";
+
private long replicasManagerSyncBrokerMetadataPeriod = 5 * 1000;
private long replicasManagerCheckSyncStateSetPeriod = 8 * 1000;
@@ -1263,14 +1277,6 @@ public class BrokerConfig extends BrokerIdentity {
this.lockInStrictMode = lockInStrictMode;
}
- public String getControllerAddr() {
- return controllerAddr;
- }
-
- public void setControllerAddr(String controllerAddr) {
- this.controllerAddr = controllerAddr;
- }
-
public boolean isIsolateLogEnable() {
return isolateLogEnable;
}
@@ -1287,6 +1293,30 @@ public class BrokerConfig extends BrokerIdentity {
this.compatibleWithOldNameSrv = compatibleWithOldNameSrv;
}
+ public boolean isStartupControllerMode() {
+ return startupControllerMode;
+ }
+
+ public void setStartupControllerMode(boolean startupControllerMode) {
+ this.startupControllerMode = startupControllerMode;
+ }
+
+ public boolean isControllerDeployedStandAlone() {
+ return isControllerDeployedStandAlone;
+ }
+
+ public void setControllerDeployedStandAlone(boolean
controllerDeployedStandAlone) {
+ isControllerDeployedStandAlone = controllerDeployedStandAlone;
+ }
+
+ public String getControllerAddr() {
+ return controllerAddr;
+ }
+
+ public void setControllerAddr(String controllerAddr) {
+ this.controllerAddr = controllerAddr;
+ }
+
public long getReplicasManagerSyncBrokerMetadataPeriod() {
return replicasManagerSyncBrokerMetadataPeriod;
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/SyncStateSet.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SyncStateSet.java
index 19e44b5f7..1dccf1edd 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/SyncStateSet.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SyncStateSet.java
@@ -26,7 +26,7 @@ public class SyncStateSet extends RemotingSerializable {
private int syncStateSetEpoch;
public SyncStateSet(Set<String> syncStateSet, int syncStateSetEpoch) {
- this.syncStateSet = syncStateSet;
+ this.syncStateSet = new HashSet<>(syncStateSet);
this.syncStateSetEpoch = syncStateSetEpoch;
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetReplicaInfoRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetReplicaInfoRequestHeader.java
index 1e8fd9d05..b881ea178 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetReplicaInfoRequestHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetReplicaInfoRequestHeader.java
@@ -21,6 +21,7 @@ import
org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class GetReplicaInfoRequestHeader implements CommandCustomHeader {
private String brokerName;
+ private String brokerAddress;
public GetReplicaInfoRequestHeader() {
}
@@ -29,6 +30,11 @@ public class GetReplicaInfoRequestHeader implements
CommandCustomHeader {
this.brokerName = brokerName;
}
+ public GetReplicaInfoRequestHeader(String brokerName, String
brokerAddress) {
+ this.brokerName = brokerName;
+ this.brokerAddress = brokerAddress;
+ }
+
public String getBrokerName() {
return brokerName;
}
@@ -37,10 +43,18 @@ public class GetReplicaInfoRequestHeader implements
CommandCustomHeader {
this.brokerName = brokerName;
}
- @Override
- public String toString() {
+ public String getBrokerAddress() {
+ return brokerAddress;
+ }
+
+ public void setBrokerAddress(String brokerAddress) {
+ this.brokerAddress = brokerAddress;
+ }
+
+ @Override public String toString() {
return "GetReplicaInfoRequestHeader{" +
"brokerName='" + brokerName + '\'' +
+ ", brokerAddress='" + brokerAddress + '\'' +
'}';
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetReplicaInfoResponseHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetReplicaInfoResponseHeader.java
index 1a9cb2316..72d4c48c0 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetReplicaInfoResponseHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetReplicaInfoResponseHeader.java
@@ -22,6 +22,8 @@ import
org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class GetReplicaInfoResponseHeader implements CommandCustomHeader {
private String masterAddress;
private int masterEpoch;
+ // BrokerId for current replicas.
+ private long brokerId = -1L;
public GetReplicaInfoResponseHeader() {
}
@@ -42,10 +44,19 @@ public class GetReplicaInfoResponseHeader implements
CommandCustomHeader {
this.masterEpoch = masterEpoch;
}
+ public long getBrokerId() {
+ return brokerId;
+ }
+
+ public void setBrokerId(long brokerId) {
+ this.brokerId = brokerId;
+ }
+
@Override public String toString() {
return "GetReplicaInfoResponseHeader{" +
"masterAddress='" + masterAddress + '\'' +
", masterEpoch=" + masterEpoch +
+ ", brokerId=" + brokerId +
'}';
}
diff --git
a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
index 48863ce1b..70294e848 100644
---
a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
+++
b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
@@ -100,7 +100,7 @@ public class InnerBrokerController extends BrokerController
{
}, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(),
TimeUnit.MILLISECONDS));
}
- if (this.messageStoreConfig.isStartupControllerMode()) {
+ if (this.brokerConfig.isStartupControllerMode()) {
scheduleSendHeartbeat();
}
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
b/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
index ec45a8009..fca8f5565 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
@@ -50,6 +50,11 @@ public interface Controller {
*/
void stopScheduling();
+ /**
+ * Whether this controller is in leader state.
+ */
+ boolean isLeaderState();
+
/**
* Alter ISR of broker replicas.
*
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index 7b835c5c0..1221709de 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -101,14 +101,18 @@ public class ControllerManager {
@Override
public void onBrokerInactive(String brokerName, String
brokerAddress, long brokerId) {
if (brokerId == MixAll.MASTER_ID) {
- final CompletableFuture<RemotingCommand> future =
controller.electMaster(new ElectMasterRequestHeader(brokerName));
- try {
- final RemotingCommand response = future.get(5,
TimeUnit.SECONDS);
- final ElectMasterResponseHeader responseHeader =
(ElectMasterResponseHeader) response.readCustomHeader();
- if (responseHeader != null) {
- log.info("Broker{}'s master shutdown, elect a new
master:{}", brokerName, responseHeader);
+ if (controller.isLeaderState()) {
+ final CompletableFuture<RemotingCommand> future =
controller.electMaster(new ElectMasterRequestHeader(brokerName));
+ try {
+ final RemotingCommand response = future.get(5,
TimeUnit.SECONDS);
+ final ElectMasterResponseHeader responseHeader =
(ElectMasterResponseHeader) response.readCustomHeader();
+ if (responseHeader != null) {
+ log.info("Broker{}'s master shutdown, elect a
new master:{}", brokerName, responseHeader);
+ }
+ } catch (Exception ignored) {
}
- } catch (Exception ignored) {
+ } else {
+ log.info("Broker{}' master shutdown", brokerName);
}
}
}
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerController.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerController.java
index d057e705d..8a4428f3b 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerController.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerController.java
@@ -124,6 +124,11 @@ public class DledgerController implements Controller {
}
}
+ @Override
+ public boolean isLeaderState() {
+ return this.roleHandler.isLeaderState();
+ }
+
@Override
public CompletableFuture<RemotingCommand>
alterSyncStateSet(AlterSyncStateSetRequestHeader request,
final SyncStateSet syncStateSet) {
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
index 1b0242e0d..f819ff1b3 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
@@ -63,6 +63,9 @@ public class BrokerInfo {
}
public Long getBrokerId(final String address) {
- return this.brokerIdTable.get(address);
+ if (this.brokerIdTable.containsKey(address)) {
+ return this.brokerIdTable.get(address);
+ }
+ return -1L;
}
}
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index 85ac2734f..3f991df4e 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.Set;
import java.util.function.BiPredicate;
import java.util.function.Predicate;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.namesrv.ControllerConfig;
@@ -262,9 +263,13 @@ public class ReplicasInfoManager {
if (isContainsBroker(brokerName)) {
// If exist broker metadata, just return metadata
final InSyncReplicasInfo replicasInfo =
this.inSyncReplicasInfoTable.get(brokerName);
+ final BrokerInfo brokerInfo =
this.replicaInfoTable.get(brokerName);
final String masterAddress = replicasInfo.getMasterAddress();
response.setMasterAddress(masterAddress);
response.setMasterEpoch(replicasInfo.getMasterEpoch());
+ if (StringUtils.isNotEmpty(request.getBrokerAddress())) {
+
response.setBrokerId(brokerInfo.getBrokerId(request.getBrokerAddress()));
+ }
result.setBody(new SyncStateSet(replicasInfo.getSyncStateSet(),
replicasInfo.getSyncStateSetEpoch()).encode());
return result;
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index afbf4dc58..f2fca86bd 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -191,7 +191,7 @@ public class DefaultMessageStore implements MessageStore {
if (!messageStoreConfig.isEnableDLegerCommitLog() &&
!this.messageStoreConfig.isDuplicationEnable()) {
this.haService =
ServiceProvider.loadClass(ServiceProvider.HA_SERVICE_ID, HAService.class);
if (null == this.haService) {
- if (this.messageStoreConfig.isStartupControllerMode()) {
+ if (brokerConfig.isStartupControllerMode()) {
this.haService = new AutoSwitchHAService();
LOGGER.warn("Load AutoSwitch HA Service: {}",
AutoSwitchHAService.class.getSimpleName());
} else {
@@ -1952,7 +1952,7 @@ public class DefaultMessageStore implements MessageStore {
destroyMappedFileIntervalForcibly, cleanAtOnce,
deleteFileBatchMax);
if (deleteCount > 0) {
// If in the controller mode, we should notify the
AutoSwitchHaService to truncateEpochFile
- if
(DefaultMessageStore.this.messageStoreConfig.isStartupControllerMode()) {
+ if
(DefaultMessageStore.this.brokerConfig.isStartupControllerMode()) {
if (DefaultMessageStore.this.haService instanceof
AutoSwitchHAService) {
final long minPhyOffset = getMinPhyOffset();
((AutoSwitchHAService)
DefaultMessageStore.this.haService).truncateEpochFilePrefix(minPhyOffset - 1);
diff --git
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 99b88d521..a182e07cf 100644
---
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -253,12 +253,6 @@ public class MessageStoreConfig {
@ImportantField
private boolean enableAutoInSyncReplicas = false;
- /**
- * Whether startup controller mode
- */
- @ImportantField
- private boolean startupControllerMode = false;
-
/**
* Enable or not ha flow control
*/
@@ -1174,14 +1168,6 @@ public class MessageStoreConfig {
this.enableAutoInSyncReplicas = enableAutoInSyncReplicas;
}
- public boolean isStartupControllerMode() {
- return startupControllerMode;
- }
-
- public void setStartupControllerMode(boolean startupControllerMode) {
- this.startupControllerMode = startupControllerMode;
- }
-
public boolean isHaFlowControlEnable() {
return haFlowControlEnable;
}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index 436dd1eed..8f74fbbb0 100644
---
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -79,7 +79,6 @@ public class AutoSwitchHATest {
storeConfig1.setStorePathEpochFile(storePathRootDir + File.separator +
"broker1" + File.separator + "EpochFileCache");
storeConfig1.setTotalReplicas(3);
storeConfig1.setInSyncReplicas(2);
- storeConfig1.setStartupControllerMode(true);
buildMessageStoreConfig(storeConfig1, mappedFileSize);
this.store1HaAddress = "127.0.0.1:10912";
@@ -91,7 +90,6 @@ public class AutoSwitchHATest {
storeConfig2.setHaListenPort(10943);
storeConfig2.setTotalReplicas(3);
storeConfig2.setInSyncReplicas(2);
- storeConfig2.setStartupControllerMode(true);
buildMessageStoreConfig(storeConfig2, mappedFileSize);
this.store2HaAddress = "127.0.0.1:10943";
@@ -106,7 +104,6 @@ public class AutoSwitchHATest {
storeConfig3.setHaListenPort(10980);
storeConfig3.setTotalReplicas(3);
storeConfig3.setInSyncReplicas(2);
- storeConfig3.setStartupControllerMode(true);
buildMessageStoreConfig(storeConfig3, mappedFileSize);
messageStore3 = buildMessageStore(storeConfig3, 3L);
this.store3HaAddress = "127.0.0.1:10980";
@@ -309,6 +306,7 @@ public class AutoSwitchHATest {
long brokerId) throws Exception {
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setBrokerId(brokerId);
+ brokerConfig.setStartupControllerMode(true);
return new DefaultMessageStore(messageStoreConfig, brokerStatsManager,
null, brokerConfig);
}
diff --git
a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
index 4db66a718..0703c65a7 100644
---
a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
+++
b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
@@ -82,6 +82,7 @@ public class AutoSwitchRoleBase {
brokerConfig.setControllerAddr(namesrvAddress);
brokerConfig.setReplicasManagerSyncBrokerMetadataPeriod(2 * 1000);
brokerConfig.setReplicasManagerCheckSyncStateSetPeriod(4 * 1000);
+ brokerConfig.setStartupControllerMode(true);
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(nettyListenPort);
@@ -110,7 +111,6 @@ public class AutoSwitchRoleBase {
storeConfig.setStorePathEpochFile(storePathRootDir + File.separator +
brokerName + File.separator + "EpochFileCache");
storeConfig.setTotalReplicas(3);
storeConfig.setInSyncReplicas(2);
- storeConfig.setStartupControllerMode(true);
storeConfig.setMappedFileSizeCommitLog(mappedFileSize);
storeConfig.setMappedFileSizeConsumeQueue(1024 * 1024);
diff --git
a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
index 2279e77a4..d2219a870 100644
---
a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
+++
b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
@@ -115,7 +115,7 @@ public class AutoSwitchRoleIntegrationTest extends
AutoSwitchRoleBase {
// Let master shutdown
brokerController1.shutdown();
this.brokerList.remove(this.brokerController1);
- Thread.sleep(5000);
+ Thread.sleep(6000);
// The slave should change to master
assertTrue(brokerController2.getReplicasManager().isMasterState());