This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new c5f80713b [ISSUE #4813] Add elect policy for controller (#4809)
c5f80713b is described below
commit c5f80713be085e44c575e6ca25baba497b7d078c
Author: TheR1sing3un <[email protected]>
AuthorDate: Tue Aug 16 08:41:51 2022 +0800
[ISSUE #4813] Add elect policy for controller (#4809)
* feat(controller): add elect policy
1. add epoch and maxOffset in heartbeat.
2. refactor elect logic, now we
elect a new master by elect policy(can expand).
3. add some unit tests
* refactor(controller): refactor some code and format some code
1. refactor some code and format some code
* fix typo in ReplicasInfoManager
1. fix typo in ReplicasInfoManager
* fix wrong method call in ReplicasManagerTest#before
1.fix wrong method call in ReplicasManagerTest#before
* fix wrong call in ReplicasManagerTest#before
1. fix wrong call in ReplicasManagerTest#before
* fix(controller): fix the bug about ReElectMaster
1. fix the bug about ReElectMaster
2. fix some typo
* style(controller): fix typo by checkstyle
1. fix typo by checkstyle
* test(broker): fix invalid usage of "any()" in
DLedegerControllerTest#before
1. fix invalid usage of "any()" in DLedegerControllerTest#before
* fix(controller): fix some wrong usage in test
1. fix some wrong usage in test
2. refactor some field type to avoid NPE
---
.../apache/rocketmq/broker/BrokerController.java | 4 +-
.../broker/controller/ReplicasManager.java | 13 +-
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 282 +++++++++++----------
.../broker/controller/ReplicasManagerTest.java | 4 +-
.../namesrv/BrokerHeartbeatRequestHeader.java | 31 +++
.../RegisterBrokerToControllerRequestHeader.java | 41 ++-
.../controller/BrokerHeartbeatManager.java | 9 +-
.../apache/rocketmq/controller/BrokerLiveInfo.java | 129 ++++++++++
.../rocketmq/controller/ControllerManager.java | 86 ++++---
.../rocketmq/controller/elect/ElectPolicy.java | 36 +++
.../controller/elect/impl/DefaultElectPolicy.java | 126 +++++++++
.../controller/impl/DLedgerController.java | 32 ++-
.../impl/DefaultBrokerHeartbeatManager.java | 87 +++----
.../impl/manager/ReplicasInfoManager.java | 121 +++------
.../processor/ControllerRequestProcessor.java | 5 +-
.../impl/controller/ControllerManagerTest.java | 2 +-
.../controller/impl/DLedgerControllerTest.java | 20 +-
.../impl/DefaultBrokerHeartbeatManagerTest.java | 2 +-
.../impl/manager/ReplicasInfoManagerTest.java | 125 +++++++--
.../store/ha/autoswitch/AutoSwitchHAService.java | 4 +
20 files changed, 802 insertions(+), 357 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index d5e93b388..84e1689ce 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1685,7 +1685,9 @@ public class BrokerController {
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getSendHeartbeatTimeoutMillis(),
- this.brokerConfig.isInBrokerContainer()
+ this.brokerConfig.isInBrokerContainer(),
this.replicasManager.getLastEpoch(),
+ this.messageStore.getMaxPhyOffset(),
+ this.replicasManager.getConfirmOffset()
);
}
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index 679585074..b1b4ebd4b 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -91,6 +91,11 @@ public class ReplicasManager {
this.haService.setLocalAddress(this.localAddress);
}
+ public long getConfirmOffset() {
+ return this.haService.getConfirmOffset();
+ }
+
+
enum State {
INITIAL,
FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE,
@@ -280,7 +285,9 @@ public class ReplicasManager {
private boolean registerBrokerToController() {
// Register this broker to controller, get brokerId and masterAddress.
try {
- final RegisterBrokerToControllerResponseHeader registerResponse =
this.brokerOuterAPI.registerBrokerToController(this.controllerLeaderAddress,
this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(),
this.localAddress);
+ final RegisterBrokerToControllerResponseHeader registerResponse =
this.brokerOuterAPI.registerBrokerToController(this.controllerLeaderAddress,
+ this.brokerConfig.getBrokerClusterName(),
this.brokerConfig.getBrokerName(), this.localAddress,
+ this.haService.getLastEpoch(),
this.brokerController.getMessageStore().getMaxPhyOffset());
final String newMasterAddress =
registerResponse.getMasterAddress();
if (StringUtils.isNoneEmpty(newMasterAddress)) {
if (StringUtils.equals(newMasterAddress, this.localAddress)) {
@@ -421,6 +428,10 @@ public class ReplicasManager {
}
}
+ public int getLastEpoch() {
+ return this.haService.getLastEpoch();
+ }
+
public BrokerRole getBrokerRole() {
return this.brokerController.getMessageStoreConfig().getBrokerRole();
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 6f89d4a7d..202fcbfdb 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
import org.apache.rocketmq.client.exception.MQBrokerException;
@@ -125,7 +126,7 @@ public class BrokerOuterAPI {
private final TopAddressing topAddressing = new
DefaultTopAddressing(MixAll.getWSAddr());
private String nameSrvAddr = null;
private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new
BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
- new ArrayBlockingQueue<>(32), new
ThreadFactoryImpl("brokerOutApi_thread_", true));
+ new ArrayBlockingQueue<>(32), new
ThreadFactoryImpl("brokerOutApi_thread_", true));
private ClientMetadata clientMetadata;
private RpcClient rpcClient;
@@ -178,13 +179,13 @@ public class BrokerOuterAPI {
}
public BrokerMemberGroup syncBrokerMemberGroup(String clusterName, String
brokerName)
- throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException {
+ throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException {
return syncBrokerMemberGroup(clusterName, brokerName, false);
}
public BrokerMemberGroup syncBrokerMemberGroup(String clusterName, String
brokerName,
- boolean isCompatibleWithOldNameSrv)
- throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException {
+ boolean
isCompatibleWithOldNameSrv)
+ throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException {
if (isCompatibleWithOldNameSrv) {
return getBrokerMemberGroupCompatible(clusterName, brokerName);
} else {
@@ -193,7 +194,7 @@ public class BrokerOuterAPI {
}
public BrokerMemberGroup getBrokerMemberGroup(String clusterName, String
brokerName)
- throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException {
+ throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException {
BrokerMemberGroup brokerMemberGroup = new
BrokerMemberGroup(clusterName, brokerName);
GetBrokerMemberGroupRequestHeader requestHeader = new
GetBrokerMemberGroupRequestHeader();
@@ -211,7 +212,7 @@ public class BrokerOuterAPI {
byte[] body = response.getBody();
if (body != null) {
GetBrokerMemberGroupResponseBody
brokerMemberGroupResponseBody =
- GetBrokerMemberGroupResponseBody.decode(body,
GetBrokerMemberGroupResponseBody.class);
+ GetBrokerMemberGroupResponseBody.decode(body,
GetBrokerMemberGroupResponseBody.class);
return
brokerMemberGroupResponseBody.getBrokerMemberGroup();
}
@@ -224,7 +225,7 @@ public class BrokerOuterAPI {
}
public BrokerMemberGroup getBrokerMemberGroupCompatible(String
clusterName, String brokerName)
- throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException {
+ throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException {
BrokerMemberGroup brokerMemberGroup = new
BrokerMemberGroup(clusterName, brokerName);
GetRouteInfoRequestHeader requestHeader = new
GetRouteInfoRequestHeader();
@@ -243,8 +244,8 @@ public class BrokerOuterAPI {
TopicRouteData topicRouteData =
TopicRouteData.decode(body, TopicRouteData.class);
for (BrokerData brokerData :
topicRouteData.getBrokerDatas()) {
if (brokerData != null
- && brokerData.getBrokerName().equals(brokerName)
- && brokerData.getCluster().equals(clusterName)) {
+ &&
brokerData.getBrokerName().equals(brokerName)
+ &&
brokerData.getCluster().equals(clusterName)) {
brokerMemberGroup.getBrokerAddrs().putAll(brokerData.getBrokerAddrs());
break;
}
@@ -260,13 +261,13 @@ public class BrokerOuterAPI {
}
public void sendHeartbeatViaDataVersion(
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final Long brokerId,
- final int timeoutMillis,
- final DataVersion dataVersion,
- final boolean isInBrokerContainer) {
+ final String clusterName,
+ final String brokerAddr,
+ final String brokerName,
+ final Long brokerId,
+ final int timeoutMillis,
+ final DataVersion dataVersion,
+ final boolean isInBrokerContainer) {
List<String> nameServerAddressList =
this.remotingClient.getAvailableNameSrvList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0)
{
final QueryDataVersionRequestHeader requestHeader = new
QueryDataVersionRequestHeader();
@@ -295,11 +296,11 @@ public class BrokerOuterAPI {
}
public void sendHeartbeat(final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final Long brokerId,
- final int timeoutMills,
- final boolean isInBrokerContainer) {
+ final String brokerAddr,
+ final String brokerName,
+ final Long brokerId,
+ final int timeoutMills,
+ final boolean isInBrokerContainer) {
List<String> nameServerAddressList =
this.remotingClient.getAvailableNameSrvList();
final BrokerHeartbeatRequestHeader requestHeader = new
BrokerHeartbeatRequestHeader();
@@ -326,8 +327,8 @@ public class BrokerOuterAPI {
}
public BrokerSyncInfo retrieveBrokerHaInfo(String masterBrokerAddr)
- throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException,
- MQBrokerException, RemotingCommandException {
+ throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException,
+ MQBrokerException, RemotingCommandException {
ExchangeHAInfoRequestHeader requestHeader = new
ExchangeHAInfoRequestHeader();
requestHeader.setMasterHaAddress(null);
@@ -348,7 +349,7 @@ public class BrokerOuterAPI {
}
public void sendBrokerHaInfo(String brokerAddr, String masterHaAddr, long
brokerInitMaxOffset, String masterAddr)
- throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
ExchangeHAInfoRequestHeader requestHeader = new
ExchangeHAInfoRequestHeader();
requestHeader.setMasterHaAddress(masterHaAddr);
requestHeader.setMasterFlushOffset(brokerInitMaxOffset);
@@ -371,30 +372,30 @@ public class BrokerOuterAPI {
}
public List<RegisterBrokerResult> registerBrokerAll(
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final long brokerId,
- final String haServerAddr,
- final TopicConfigSerializeWrapper topicConfigWrapper,
- final List<String> filterServerList,
- final boolean oneway,
- final int timeoutMills,
- final boolean enableActingMaster,
- final boolean compressed,
- final BrokerIdentity brokerIdentity) {
+ final String clusterName,
+ final String brokerAddr,
+ final String brokerName,
+ final long brokerId,
+ final String haServerAddr,
+ final TopicConfigSerializeWrapper topicConfigWrapper,
+ final List<String> filterServerList,
+ final boolean oneway,
+ final int timeoutMills,
+ final boolean enableActingMaster,
+ final boolean compressed,
+ final BrokerIdentity brokerIdentity) {
return registerBrokerAll(clusterName,
- brokerAddr,
- brokerName,
- brokerId,
- haServerAddr,
- topicConfigWrapper,
- filterServerList,
- oneway, timeoutMills,
- enableActingMaster,
- compressed,
- null,
- brokerIdentity);
+ brokerAddr,
+ brokerName,
+ brokerId,
+ haServerAddr,
+ topicConfigWrapper,
+ filterServerList,
+ oneway, timeoutMills,
+ enableActingMaster,
+ compressed,
+ null,
+ brokerIdentity);
}
/**
@@ -410,23 +411,23 @@ public class BrokerOuterAPI {
* @param filterServerList
* @param oneway
* @param timeoutMills
- * @param compressed default false
+ * @param compressed default false
* @return
*/
public List<RegisterBrokerResult> registerBrokerAll(
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final long brokerId,
- final String haServerAddr,
- final TopicConfigSerializeWrapper topicConfigWrapper,
- final List<String> filterServerList,
- final boolean oneway,
- final int timeoutMills,
- final boolean enableActingMaster,
- final boolean compressed,
- final Long heartbeatTimeoutMillis,
- final BrokerIdentity brokerIdentity) {
+ final String clusterName,
+ final String brokerAddr,
+ final String brokerName,
+ final long brokerId,
+ final String haServerAddr,
+ final TopicConfigSerializeWrapper topicConfigWrapper,
+ final List<String> filterServerList,
+ final boolean oneway,
+ final int timeoutMills,
+ final boolean enableActingMaster,
+ final boolean compressed,
+ final Long heartbeatTimeoutMillis,
+ final BrokerIdentity brokerIdentity) {
final List<RegisterBrokerResult> registerBrokerResultList = new
CopyOnWriteArrayList<>();
List<String> nameServerAddressList =
this.remotingClient.getAvailableNameSrvList();
@@ -483,13 +484,13 @@ public class BrokerOuterAPI {
}
private RegisterBrokerResult registerBroker(
- final String namesrvAddr,
- final boolean oneway,
- final int timeoutMills,
- final RegisterBrokerRequestHeader requestHeader,
- final byte[] body
+ final String namesrvAddr,
+ final boolean oneway,
+ final int timeoutMills,
+ final RegisterBrokerRequestHeader requestHeader,
+ final byte[] body
) throws RemotingCommandException, MQBrokerException,
RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException,
- InterruptedException {
+ InterruptedException {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER,
requestHeader);
request.setBody(body);
@@ -507,7 +508,7 @@ public class BrokerOuterAPI {
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
RegisterBrokerResponseHeader responseHeader =
- (RegisterBrokerResponseHeader)
response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
+ (RegisterBrokerResponseHeader)
response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
RegisterBrokerResult result = new RegisterBrokerResult();
result.setMasterAddr(responseHeader.getMasterAddr());
result.setHaServerAddr(responseHeader.getHaServerAddr());
@@ -524,10 +525,10 @@ public class BrokerOuterAPI {
}
public void unregisterBrokerAll(
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final long brokerId
+ final String clusterName,
+ final String brokerAddr,
+ final String brokerName,
+ final long brokerId
) {
List<String> nameServerAddressList =
this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null) {
@@ -543,11 +544,11 @@ public class BrokerOuterAPI {
}
public void unregisterBroker(
- final String namesrvAddr,
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final long brokerId
+ final String namesrvAddr,
+ final String clusterName,
+ final String brokerAddr,
+ final String brokerName,
+ final long brokerId
) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQBrokerException {
UnRegisterBrokerRequestHeader requestHeader = new
UnRegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
@@ -573,13 +574,13 @@ public class BrokerOuterAPI {
}
public List<Boolean> needRegister(
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final long brokerId,
- final TopicConfigSerializeWrapper topicConfigWrapper,
- final int timeoutMills,
- final boolean isInBrokerContainer) {
+ final String clusterName,
+ final String brokerAddr,
+ final String brokerName,
+ final long brokerId,
+ final TopicConfigSerializeWrapper topicConfigWrapper,
+ final int timeoutMills,
+ final boolean isInBrokerContainer) {
final List<Boolean> changedList = new CopyOnWriteArrayList<>();
List<String> nameServerAddressList =
this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0)
{
@@ -602,7 +603,7 @@ public class BrokerOuterAPI {
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
QueryDataVersionResponseHeader
queryDataVersionResponseHeader =
- (QueryDataVersionResponseHeader)
response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
+ (QueryDataVersionResponseHeader)
response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
changed =
queryDataVersionResponseHeader.getChanged();
byte[] body = response.getBody();
if (body != null) {
@@ -639,8 +640,8 @@ public class BrokerOuterAPI {
}
public TopicConfigAndMappingSerializeWrapper getAllTopicConfig(
- final String addr) throws RemotingConnectException,
RemotingSendRequestException,
- RemotingTimeoutException, InterruptedException, MQBrokerException {
+ final String addr) throws RemotingConnectException,
RemotingSendRequestException,
+ RemotingTimeoutException, InterruptedException, MQBrokerException {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
RemotingCommand response =
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request,
3000);
@@ -657,8 +658,8 @@ public class BrokerOuterAPI {
}
public TimerCheckpoint getTimerCheckPoint(
- final String addr) throws RemotingConnectException,
RemotingSendRequestException,
- RemotingTimeoutException, InterruptedException, MQBrokerException {
+ final String addr) throws RemotingConnectException,
RemotingSendRequestException,
+ RemotingTimeoutException, InterruptedException, MQBrokerException {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_TIMER_CHECK_POINT, null);
RemotingCommand response =
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request,
3000);
@@ -671,12 +672,12 @@ public class BrokerOuterAPI {
break;
}
- throw new MQBrokerException(response.getCode(),
response.getRemark(),addr);
+ throw new MQBrokerException(response.getCode(), response.getRemark(),
addr);
}
public TimerMetrics.TimerMetricsSerializeWrapper getTimerMetrics(
- final String addr) throws RemotingConnectException,
RemotingSendRequestException,
- RemotingTimeoutException, InterruptedException, MQBrokerException {
+ final String addr) throws RemotingConnectException,
RemotingSendRequestException,
+ RemotingTimeoutException, InterruptedException, MQBrokerException {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_TIMER_METRICS, null);
RemotingCommand response =
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request,
3000);
@@ -693,8 +694,8 @@ public class BrokerOuterAPI {
}
public ConsumerOffsetSerializeWrapper getAllConsumerOffset(
- final String addr) throws InterruptedException,
RemotingTimeoutException,
- RemotingSendRequestException, RemotingConnectException,
MQBrokerException {
+ final String addr) throws InterruptedException,
RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException,
MQBrokerException {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
RemotingCommand response = this.remotingClient.invokeSync(addr,
request, 3000);
assert response != null;
@@ -710,8 +711,8 @@ public class BrokerOuterAPI {
}
public String getAllDelayOffset(
- final String addr) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException,
- RemotingConnectException, MQBrokerException,
UnsupportedEncodingException {
+ final String addr) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException,
+ RemotingConnectException, MQBrokerException,
UnsupportedEncodingException {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null);
RemotingCommand response = this.remotingClient.invokeSync(addr,
request, 3000);
assert response != null;
@@ -727,8 +728,8 @@ public class BrokerOuterAPI {
}
public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(
- final String addr) throws InterruptedException,
RemotingTimeoutException,
- RemotingSendRequestException, RemotingConnectException,
MQBrokerException {
+ final String addr) throws InterruptedException,
RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException,
MQBrokerException {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG,
null);
RemotingCommand response = this.remotingClient.invokeSync(addr,
request, 3000);
assert response != null;
@@ -752,8 +753,8 @@ public class BrokerOuterAPI {
}
public long getMaxOffset(final String addr, final String topic, final int
queueId, final boolean committed,
- final boolean isOnlyThisBroker)
- throws RemotingException, MQBrokerException, InterruptedException {
+ final boolean isOnlyThisBroker)
+ throws RemotingException, MQBrokerException, InterruptedException {
GetMaxOffsetRequestHeader requestHeader = new
GetMaxOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId);
@@ -776,7 +777,7 @@ public class BrokerOuterAPI {
}
public long getMinOffset(final String addr, final String topic, final int
queueId, final boolean isOnlyThisBroker)
- throws RemotingException, MQBrokerException, InterruptedException {
+ throws RemotingException, MQBrokerException, InterruptedException {
GetMinOffsetRequestHeader requestHeader = new
GetMinOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId);
@@ -798,10 +799,10 @@ public class BrokerOuterAPI {
}
public void lockBatchMQAsync(
- final String addr,
- final LockBatchRequestBody requestBody,
- final long timeoutMillis,
- final LockCallback callback) throws RemotingException,
InterruptedException {
+ final String addr,
+ final LockBatchRequestBody requestBody,
+ final long timeoutMillis,
+ final LockCallback callback) throws RemotingException,
InterruptedException {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
request.setBody(requestBody.encode());
@@ -815,7 +816,7 @@ public class BrokerOuterAPI {
if (response != null) {
if (response.getCode() == ResponseCode.SUCCESS) {
LockBatchResponseBody responseBody =
LockBatchResponseBody.decode(response.getBody(),
- LockBatchResponseBody.class);
+ LockBatchResponseBody.class);
Set<MessageQueue> messageQueues =
responseBody.getLockOKMQSet();
callback.onSuccess(messageQueues);
} else {
@@ -829,10 +830,10 @@ public class BrokerOuterAPI {
}
public void unlockBatchMQAsync(
- final String addr,
- final UnlockBatchRequestBody requestBody,
- final long timeoutMillis,
- final UnlockCallback callback) throws RemotingException,
InterruptedException {
+ final String addr,
+ final UnlockBatchRequestBody requestBody,
+ final long timeoutMillis,
+ final UnlockCallback callback) throws RemotingException,
InterruptedException {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);
request.setBody(requestBody.encode());
@@ -862,8 +863,8 @@ public class BrokerOuterAPI {
}
public SendResult sendMessageToSpecificBroker(String brokerAddr, final
String brokerName,
- final MessageExt msg, String group,
- long timeoutMillis) throws RemotingException, MQBrokerException,
InterruptedException {
+ final MessageExt msg, String
group,
+ long timeoutMillis) throws
RemotingException, MQBrokerException, InterruptedException {
SendMessageRequestHeader requestHeader = new
SendMessageRequestHeader();
requestHeader.setProducerGroup(group);
@@ -889,9 +890,9 @@ public class BrokerOuterAPI {
}
private SendResult processSendResponse(
- final String brokerName,
- final Message msg,
- final RemotingCommand response
+ final String brokerName,
+ final Message msg,
+ final RemotingCommand response
) throws MQBrokerException, RemotingCommandException {
switch (response.getCode()) {
case ResponseCode.FLUSH_DISK_TIMEOUT:
@@ -918,7 +919,7 @@ public class BrokerOuterAPI {
}
SendMessageResponseHeader responseHeader =
- (SendMessageResponseHeader)
response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
+ (SendMessageResponseHeader)
response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
//If namespace not null , reset Topic without namespace.
String topic = msg.getTopic();
@@ -934,8 +935,8 @@ public class BrokerOuterAPI {
uniqMsgId = sb.toString();
}
SendResult sendResult = new SendResult(sendStatus,
- uniqMsgId,
- responseHeader.getMsgId(), messageQueue,
responseHeader.getQueueOffset());
+ uniqMsgId,
+ responseHeader.getMsgId(), messageQueue,
responseHeader.getQueueOffset());
sendResult.setTransactionId(responseHeader.getTransactionId());
String regionId =
response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
String traceOn =
response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
@@ -962,12 +963,12 @@ public class BrokerOuterAPI {
}
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic,
final long timeoutMillis)
- throws RemotingException, MQBrokerException, InterruptedException {
+ throws RemotingException, MQBrokerException, InterruptedException {
return getTopicRouteInfoFromNameServer(topic, timeoutMillis, true);
}
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic,
final long timeoutMillis,
- boolean allowTopicNotExist) throws MQBrokerException,
InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException {
+ boolean
allowTopicNotExist) throws MQBrokerException, InterruptedException,
RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException {
GetRouteInfoRequestHeader requestHeader = new
GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);
@@ -1012,7 +1013,7 @@ public class BrokerOuterAPI {
}
public void forwardRequest(String brokerAddr, RemotingCommand request,
long timeoutMillis,
- InvokeCallback invokeCallback) throws InterruptedException,
RemotingSendRequestException, RemotingTimeoutException,
RemotingTooMuchRequestException, RemotingConnectException {
+ InvokeCallback invokeCallback) throws
InterruptedException, RemotingSendRequestException, RemotingTimeoutException,
RemotingTooMuchRequestException, RemotingConnectException {
this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis,
invokeCallback);
}
@@ -1030,8 +1031,8 @@ public class BrokerOuterAPI {
}
public MessageRequestModeSerializeWrapper getAllMessageRequestMode(
- final String addr) throws RemotingSendRequestException,
RemotingConnectException,
- MQBrokerException, RemotingTimeoutException, InterruptedException {
+ final String addr) throws RemotingSendRequestException,
RemotingConnectException,
+ MQBrokerException, RemotingTimeoutException, InterruptedException {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_MESSAGE_REQUEST_MODE,
null);
RemotingCommand response = this.remotingClient.invokeSync(addr,
request, 3000);
assert response != null;
@@ -1060,10 +1061,10 @@ public class BrokerOuterAPI {
* Alter syncStateSet
*/
public SyncStateSet alterSyncStateSet(
- final String controllerAddress,
- final String brokerName,
- final String masterAddress, final int masterEpoch,
- final Set<String> newSyncStateSet, final int syncStateSetEpoch) throws
Exception {
+ final String controllerAddress,
+ final String brokerName,
+ final String masterAddress, final int masterEpoch,
+ final Set<String> newSyncStateSet, final int syncStateSetEpoch)
throws Exception {
final AlterSyncStateSetRequestHeader requestHeader = new
AlterSyncStateSetRequestHeader(brokerName, masterAddress, masterEpoch);
final RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET,
requestHeader);
@@ -1086,10 +1087,10 @@ public class BrokerOuterAPI {
* Register broker to controller
*/
public RegisterBrokerToControllerResponseHeader registerBrokerToController(
- final String controllerAddress, final String clusterName,
- final String brokerName, final String address) throws Exception {
+ final String controllerAddress, final String clusterName,
+ final String brokerName, final String address, final int epoch,
final long maxOffset) throws Exception {
- final RegisterBrokerToControllerRequestHeader requestHeader = new
RegisterBrokerToControllerRequestHeader(clusterName, brokerName, address);
+ final RegisterBrokerToControllerRequestHeader requestHeader = new
RegisterBrokerToControllerRequestHeader(clusterName, brokerName, address,
epoch, maxOffset);
final RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER,
requestHeader);
final RemotingCommand response =
this.remotingClient.invokeSync(controllerAddress, request, 3000);
assert response != null;
@@ -1108,7 +1109,7 @@ public class BrokerOuterAPI {
* Get broker replica info
*/
public Pair<GetReplicaInfoResponseHeader, SyncStateSet>
getReplicaInfo(final String controllerAddress,
- final String brokerName, final String brokerAddress) throws Exception {
+
final String brokerName, final String brokerAddress) throws Exception {
final GetReplicaInfoRequestHeader requestHeader = new
GetReplicaInfoRequestHeader(brokerName, brokerAddress);
final RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_REPLICA_INFO,
requestHeader);
final RemotingCommand response =
this.remotingClient.invokeSync(controllerAddress, request, 3000);
@@ -1131,12 +1132,15 @@ public class BrokerOuterAPI {
* Send heartbeat to controller
*/
public void sendHeartbeatToController(final String controllerAddress,
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final Long brokerId,
- final int timeoutMills,
- final boolean isInBrokerContainer) {
+ final String clusterName,
+ final String brokerAddr,
+ final String brokerName,
+ final Long brokerId,
+ final int timeoutMills,
+ final boolean isInBrokerContainer,
+ final int epoch,
+ final long maxOffset,
+ final long confirmOffset) {
if (StringUtils.isEmpty(controllerAddress)) {
return;
}
@@ -1145,7 +1149,9 @@ public class BrokerOuterAPI {
requestHeader.setClusterName(clusterName);
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerName(brokerName);
-
+ requestHeader.setEpoch(epoch);
+ requestHeader.setMaxOffset(maxOffset);
+ requestHeader.setConfirmOffset(confirmOffset);
brokerOuterExecutor.execute(new AbstractBrokerRunnable(new
BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) {
@Override
public void run2() {
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
index bb0c83502..b7ab79eda 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
@@ -40,6 +40,8 @@ import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@@ -121,7 +123,7 @@ public class ReplicasManagerTest {
when(brokerController.getBrokerOuterAPI()).thenReturn(brokerOuterAPI);
when(brokerController.getBrokerAddr()).thenReturn(OLD_MASTER_ADDRESS);
when(brokerOuterAPI.getControllerMetaData(any())).thenReturn(getMetaDataResponseHeader);
- when(brokerOuterAPI.registerBrokerToController(any(), any(), any(),
any())).thenReturn(registerBrokerToControllerResponseHeader);
+ when(brokerOuterAPI.registerBrokerToController(any(), any(), any(),
any(), anyInt(),
anyLong())).thenReturn(registerBrokerToControllerResponseHeader);
when(brokerOuterAPI.getReplicaInfo(any(), any(),
any())).thenReturn(result);
replicasManager = new ReplicasManager(brokerController);
autoSwitchHAService.init(defaultMessageStore);
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/BrokerHeartbeatRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/BrokerHeartbeatRequestHeader.java
index 52c3abef0..4c45e9b4a 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/BrokerHeartbeatRequestHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/BrokerHeartbeatRequestHeader.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.common.protocol.header.namesrv;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class BrokerHeartbeatRequestHeader implements CommandCustomHeader {
@@ -28,6 +29,12 @@ public class BrokerHeartbeatRequestHeader implements
CommandCustomHeader {
private String brokerAddr;
@CFNotNull
private String brokerName;
+ @CFNullable
+ private Integer epoch;
+ @CFNullable
+ private Long maxOffset;
+ @CFNullable
+ private Long confirmOffset;
@Override
public void checkFields() throws RemotingCommandException {
@@ -57,4 +64,28 @@ public class BrokerHeartbeatRequestHeader implements
CommandCustomHeader {
public void setBrokerName(String brokerName) {
this.brokerName = brokerName;
}
+
+ public Integer getEpoch() {
+ return epoch;
+ }
+
+ public void setEpoch(Integer epoch) {
+ this.epoch = epoch;
+ }
+
+ public Long getMaxOffset() {
+ return maxOffset;
+ }
+
+ public void setMaxOffset(Long maxOffset) {
+ this.maxOffset = maxOffset;
+ }
+
+ public Long getConfirmOffset() {
+ return confirmOffset;
+ }
+
+ public void setConfirmOffset(Long confirmOffset) {
+ this.confirmOffset = confirmOffset;
+ }
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/RegisterBrokerToControllerRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/RegisterBrokerToControllerRequestHeader.java
index 9a0332cef..1028ead6e 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/RegisterBrokerToControllerRequestHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/RegisterBrokerToControllerRequestHeader.java
@@ -25,6 +25,10 @@ public class RegisterBrokerToControllerRequestHeader
implements CommandCustomHea
private String brokerName;
private String brokerAddress;
@CFNullable
+ private Integer epoch;
+ @CFNullable
+ private Long maxOffset;
+ @CFNullable
private Long heartbeatTimeoutMillis;
@@ -37,6 +41,14 @@ public class RegisterBrokerToControllerRequestHeader
implements CommandCustomHea
this.brokerAddress = brokerAddress;
}
+ public RegisterBrokerToControllerRequestHeader(String clusterName, String
brokerName, String brokerAddress, int epoch, long maxOffset) {
+ this.clusterName = clusterName;
+ this.brokerName = brokerName;
+ this.brokerAddress = brokerAddress;
+ this.epoch = epoch;
+ this.maxOffset = maxOffset;
+ }
+
public String getClusterName() {
return clusterName;
}
@@ -71,11 +83,30 @@ public class RegisterBrokerToControllerRequestHeader
implements CommandCustomHea
@Override
public String toString() {
- return "RegisterBrokerRequestHeader{" +
- "clusterName='" + clusterName + '\'' +
- ", brokerName='" + brokerName + '\'' +
- ", brokerAddress='" + brokerAddress + '\'' +
- '}';
+ return "RegisterBrokerToControllerRequestHeader{" +
+ "clusterName='" + clusterName + '\'' +
+ ", brokerName='" + brokerName + '\'' +
+ ", brokerAddress='" + brokerAddress + '\'' +
+ ", epoch=" + epoch +
+ ", maxOffset=" + maxOffset +
+ ", heartbeatTimeoutMillis=" + heartbeatTimeoutMillis +
+ '}';
+ }
+
+ public Integer getEpoch() {
+ return epoch;
+ }
+
+ public void setEpoch(Integer epoch) {
+ this.epoch = epoch;
+ }
+
+ public Long getMaxOffset() {
+ return maxOffset;
+ }
+
+ public void setMaxOffset(Long maxOffset) {
+ this.maxOffset = maxOffset;
}
@Override
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
index e2ff58301..364b32647 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
@@ -23,7 +23,7 @@ public interface BrokerHeartbeatManager {
/**
* Broker new heartbeat.
*/
- void onBrokerHeartbeat(final String clusterName, final String brokerAddr);
+ void onBrokerHeartbeat(final String clusterName, final String brokerAddr,
final Integer epoch, final Long maxOffset, final Long confirmOffset);
/**
* Change the metadata(brokerId ..) for a broker.
@@ -49,13 +49,18 @@ public interface BrokerHeartbeatManager {
* Register new broker to heartManager.
*/
void registerBroker(final String clusterName, final String brokerName,
final String brokerAddr, final long brokerId,
- final Long timeoutMillis, final Channel channel);
+ final Long timeoutMillis, final Channel channel, final
Integer epoch, final Long maxOffset);
/**
* Broker channel close
*/
void onBrokerChannelClose(final Channel channel);
+ /**
+ * Get broker live information by clusterName and brokerAddr
+ */
+ BrokerLiveInfo getBrokerLiveInfo(String clusterName, String brokerAddr);
+
/**
* Check whether broker active
*/
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java
b/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java
new file mode 100644
index 000000000..e88b26c39
--- /dev/null
+++
b/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller;
+
+
+import io.netty.channel.Channel;
+
+
+public class BrokerLiveInfo {
+ private final String brokerName;
+
+ private final String brokerAddr;
+ private final long heartbeatTimeoutMillis;
+ private final Channel channel;
+ private long brokerId;
+ private long lastUpdateTimestamp;
+ private int epoch;
+ private long maxOffset;
+ private long confirmOffset;
+
+ public BrokerLiveInfo(String brokerName, String brokerAddr,long brokerId,
long lastUpdateTimestamp, long heartbeatTimeoutMillis,
+ Channel channel, int epoch, long maxOffset) {
+ this.brokerName = brokerName;
+ this.brokerAddr = brokerAddr;
+ this.brokerId = brokerId;
+ this.lastUpdateTimestamp = lastUpdateTimestamp;
+ this.heartbeatTimeoutMillis = heartbeatTimeoutMillis;
+ this.channel = channel;
+ this.epoch = epoch;
+ this.maxOffset = maxOffset;
+ }
+ public BrokerLiveInfo(String brokerName, String brokerAddr,long brokerId,
long lastUpdateTimestamp, long heartbeatTimeoutMillis,
+ Channel channel, int epoch, long maxOffset, long
confirmOffset) {
+ this.brokerName = brokerName;
+ this.brokerAddr = brokerAddr;
+ this.brokerId = brokerId;
+ this.lastUpdateTimestamp = lastUpdateTimestamp;
+ this.heartbeatTimeoutMillis = heartbeatTimeoutMillis;
+ this.channel = channel;
+ this.epoch = epoch;
+ this.maxOffset = maxOffset;
+ this.confirmOffset = confirmOffset;
+ }
+
+ @Override
+ public String toString() {
+ return "BrokerLiveInfo{" +
+ "brokerName='" + brokerName + '\'' +
+ ", brokerAddr='" + brokerAddr + '\'' +
+ ", heartbeatTimeoutMillis=" + heartbeatTimeoutMillis +
+ ", channel=" + channel +
+ ", brokerId=" + brokerId +
+ ", lastUpdateTimestamp=" + lastUpdateTimestamp +
+ ", epoch=" + epoch +
+ ", maxOffset=" + maxOffset +
+ ", confirmOffset=" + confirmOffset +
+ '}';
+ }
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+ public long getHeartbeatTimeoutMillis() {
+ return heartbeatTimeoutMillis;
+ }
+
+ public Channel getChannel() {
+ return channel;
+ }
+
+ public long getBrokerId() {
+ return brokerId;
+ }
+
+ public void setBrokerId(long brokerId) {
+ this.brokerId = brokerId;
+ }
+
+ public long getLastUpdateTimestamp() {
+ return lastUpdateTimestamp;
+ }
+
+ public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+ this.lastUpdateTimestamp = lastUpdateTimestamp;
+ }
+
+ public int getEpoch() {
+ return epoch;
+ }
+
+ public void setEpoch(int epoch) {
+ this.epoch = epoch;
+ }
+
+ public long getMaxOffset() {
+ return maxOffset;
+ }
+
+ public void setMaxOffset(long maxOffset) {
+ this.maxOffset = maxOffset;
+ }
+
+ public String getBrokerAddr() {
+ return brokerAddr;
+ }
+
+ public void setConfirmOffset(long confirmOffset) {
+ this.confirmOffset = confirmOffset;
+ }
+
+ public long getConfirmOffset() {
+ return confirmOffset;
+ }
+}
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index 4f0e1e75d..cd4a60158 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -24,6 +24,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.Configuration;
import org.apache.rocketmq.common.MixAll;
@@ -36,6 +37,7 @@ import
org.apache.rocketmq.common.protocol.body.BrokerMemberGroup;
import
org.apache.rocketmq.common.protocol.header.NotifyBrokerRoleChangedRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
import org.apache.rocketmq.controller.impl.DLedgerController;
import org.apache.rocketmq.controller.impl.DefaultBrokerHeartbeatManager;
import org.apache.rocketmq.controller.processor.ControllerRequestProcessor;
@@ -63,15 +65,12 @@ public class ControllerManager {
private BlockingQueue<Runnable> controllerRequestThreadPoolQueue;
public ControllerManager(ControllerConfig controllerConfig,
NettyServerConfig nettyServerConfig,
- NettyClientConfig nettyClientConfig) {
+ NettyClientConfig nettyClientConfig) {
this.controllerConfig = controllerConfig;
this.nettyServerConfig = nettyServerConfig;
this.nettyClientConfig = nettyClientConfig;
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
- this.configuration = new Configuration(
- log,
- this.controllerConfig, this.nettyServerConfig
- );
+ this.configuration = new Configuration(log, this.controllerConfig,
this.nettyServerConfig);
this.configuration.setStorePathFromConfig(this.controllerConfig,
"configStorePath");
this.remotingClient = new NettyRemotingClient(nettyClientConfig);
}
@@ -79,12 +78,12 @@ public class ControllerManager {
public boolean initialize() {
this.controllerRequestThreadPoolQueue = new
LinkedBlockingQueue<>(this.controllerConfig.getControllerRequestThreadPoolQueueCapacity());
this.controllerRequestExecutor = new ThreadPoolExecutor(
- this.controllerConfig.getControllerThreadPoolNums(),
- this.controllerConfig.getControllerThreadPoolNums(),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- this.controllerRequestThreadPoolQueue,
- new ThreadFactoryImpl("ControllerRequestExecutorThread_")) {
+ this.controllerConfig.getControllerThreadPoolNums(),
+ this.controllerConfig.getControllerThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.controllerRequestThreadPoolQueue,
+ new ThreadFactoryImpl("ControllerRequestExecutorThread_")) {
@Override
protected <T> RunnableFuture<T> newTaskFor(final Runnable
runnable, final T value) {
return new FutureTaskExt<T>(runnable, value);
@@ -97,39 +96,46 @@ public class ControllerManager {
if
(StringUtils.isEmpty(this.controllerConfig.getControllerDLegerSelfId())) {
throw new IllegalArgumentException("Attribute value
controllerDLegerSelfId of ControllerConfig is null or empty");
}
- this.controller = new DLedgerController(this.controllerConfig,
(cluster, brokerAddr) -> this.heartbeatManager.isBrokerActive(cluster,
brokerAddr),
- this.nettyServerConfig, this.nettyClientConfig,
this.brokerHousekeepingService);
+ this.controller = new DLedgerController(this.controllerConfig,
this.heartbeatManager::isBrokerActive,
+ this.nettyServerConfig, this.nettyClientConfig,
this.brokerHousekeepingService,
+ new DefaultElectPolicy(this.heartbeatManager::isBrokerActive,
this.heartbeatManager::getBrokerLiveInfo));
// Register broker inactive listener
- this.heartbeatManager.addBrokerLifecycleListener(new
BrokerHeartbeatManager.BrokerLifecycleListener() {
- @Override
- public void onBrokerInactive(String clusterName, String
brokerName, String brokerAddress, long brokerId) {
- if (brokerId == MixAll.MASTER_ID) {
- if (controller.isLeaderState()) {
- final CompletableFuture<RemotingCommand> future =
controller.electMaster(new ElectMasterRequestHeader(brokerName));
- try {
- final RemotingCommand response = future.get(5,
TimeUnit.SECONDS);
- final ElectMasterResponseHeader responseHeader =
(ElectMasterResponseHeader) response.readCustomHeader();
- if (responseHeader != null) {
- log.info("Broker {}'s master {} shutdown,
elect a new master done, result:{}", brokerName, brokerAddress, responseHeader);
- if
(StringUtils.isNotEmpty(responseHeader.getNewMasterAddress())) {
-
heartbeatManager.changeBrokerMetadata(clusterName,
responseHeader.getNewMasterAddress(), MixAll.MASTER_ID);
- }
-
- if
(controllerConfig.isNotifyBrokerRoleChanged()) {
- notifyBrokerRoleChanged(responseHeader,
clusterName);
- }
- }
- } catch (Exception ignored) {
+
this.heartbeatManager.addBrokerLifecycleListener(this::onBrokerInactive);
+ registerProcessor();
+ return true;
+ }
+
+ /**
+ * When the heartbeatManager detects the "Broker is not active",
+ * we call this method to elect a master and do something else.
+ * @param clusterName The cluster name of this inactive broker
+ * @param brokerName The inactive broker name
+ * @param brokerAddress The inactive broker address(ip)
+ * @param brokerId The inactive broker id
+ */
+ private void onBrokerInactive(String clusterName, String brokerName,
String brokerAddress, long brokerId) {
+ if (brokerId == MixAll.MASTER_ID) {
+ if (controller.isLeaderState()) {
+ final CompletableFuture<RemotingCommand> future =
controller.electMaster(new ElectMasterRequestHeader(brokerName));
+ try {
+ final RemotingCommand response = future.get(5,
TimeUnit.SECONDS);
+ final ElectMasterResponseHeader responseHeader =
(ElectMasterResponseHeader) response.readCustomHeader();
+ if (responseHeader != null) {
+ log.info("Broker {}'s master {} shutdown, elect a new
master done, result:{}", brokerName, brokerAddress, responseHeader);
+ if
(StringUtils.isNotEmpty(responseHeader.getNewMasterAddress())) {
+ heartbeatManager.changeBrokerMetadata(clusterName,
responseHeader.getNewMasterAddress(), MixAll.MASTER_ID);
+ }
+ if (controllerConfig.isNotifyBrokerRoleChanged()) {
+ notifyBrokerRoleChanged(responseHeader,
clusterName);
}
- } else {
- log.info("Broker{}' master shutdown", brokerName);
}
+ } catch (Exception ignored) {
}
+ } else {
+ log.info("Broker{}' master shutdown", brokerName);
}
- });
- registerProcessor();
- return true;
+ }
}
/**
@@ -156,11 +162,11 @@ public class ControllerManager {
}
public void doNotifyBrokerRoleChanged(final String brokerAddr, final Long
brokerId,
- final ElectMasterResponseHeader responseHeader) {
+ final ElectMasterResponseHeader
responseHeader) {
if (StringUtils.isNoneEmpty(brokerAddr)) {
log.info("Try notify broker {} with id {} that role changed,
responseHeader:{}", brokerAddr, brokerId, responseHeader);
final NotifyBrokerRoleChangedRequestHeader requestHeader = new
NotifyBrokerRoleChangedRequestHeader(responseHeader.getNewMasterAddress(),
- responseHeader.getMasterEpoch(),
responseHeader.getSyncStateSetEpoch(), brokerId);
+ responseHeader.getMasterEpoch(),
responseHeader.getSyncStateSetEpoch(), brokerId);
final RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.NOTIFY_BROKER_ROLE_CHANGED,
requestHeader);
try {
this.remotingClient.invokeOneway(brokerAddr, request, 3000);
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java
b/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java
new file mode 100644
index 000000000..214012e51
--- /dev/null
+++
b/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller.elect;
+
+
+import java.util.Set;
+
+public interface ElectPolicy {
+
+ /**
+ * elect a master
+ *
+ * @param clusterName the brokerGroup belongs
+ * @param syncStateBrokers all broker replicas in syncStateSet
+ * @param allReplicaBrokers all broker replicas
+ * @param oldMaster old master
+ * @param preferBrokerAddr the broker prefer to be elected
+ * @return new master's brokerAddr
+ */
+ String elect(String clusterName, Set<String> syncStateBrokers, Set<String>
allReplicaBrokers, String oldMaster, String preferBrokerAddr);
+
+}
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java
b/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java
new file mode 100644
index 000000000..c1b2a50d5
--- /dev/null
+++
b/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller.elect.impl;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.controller.elect.ElectPolicy;
+import org.apache.rocketmq.controller.BrokerLiveInfo;
+
+import java.util.Comparator;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.function.BiFunction;
+import java.util.function.BiPredicate;
+import java.util.stream.Collectors;
+
+public class DefaultElectPolicy implements ElectPolicy {
+
+ // <clusterName, brokerAddr> valid predicate
+ private BiPredicate<String, String> validPredicate;
+
+ // <clusterName, brokerAddr, info> getter to get more information
+ private BiFunction<String, String, BrokerLiveInfo> additionalInfoGetter;
+
+ private final Comparator<BrokerLiveInfo> comparator = (x, y) -> {
+ return x.getEpoch() == y.getEpoch() ? (int) (y.getMaxOffset() -
x.getMaxOffset()) : y.getEpoch() - x.getEpoch();
+ };
+
+ public DefaultElectPolicy(BiPredicate<String, String> validPredicate,
BiFunction<String, String, BrokerLiveInfo> additionalInfoGetter) {
+ this.validPredicate = validPredicate;
+ this.additionalInfoGetter = additionalInfoGetter;
+ }
+
+ public DefaultElectPolicy() {
+
+ }
+
+ /**
+ * try to elect a master, if old master still alive, now we do nothing,
+ * if preferBrokerAddr is not blank, that means we must elect a new master,
+ * and we should check if the preferBrokerAddr is valid, if so we should
elect it as
+ * new master, if else we should elect nothing.
+ * @param clusterName the brokerGroup belongs
+ * @param syncStateBrokers all broker replicas in syncStateSet
+ * @param allReplicaBrokers all broker replicas
+ * @param oldMaster old master
+ * @param preferBrokerAddr the broker prefer to be elected
+ * @return master elected by our own policy
+ */
+ @Override
+ public String elect(String clusterName, Set<String> syncStateBrokers,
Set<String> allReplicaBrokers, String oldMaster, String preferBrokerAddr) {
+ String newMaster = null;
+ // try to elect in syncStateBrokers
+ if (syncStateBrokers != null) {
+ newMaster = tryElect(clusterName, syncStateBrokers, oldMaster,
preferBrokerAddr);
+ }
+ if (StringUtils.isNotEmpty(newMaster)) {
+ return newMaster;
+ }
+ // try to elect in all replicas
+ if (allReplicaBrokers != null) {
+ newMaster = tryElect(clusterName, allReplicaBrokers, oldMaster,
preferBrokerAddr);
+ }
+ return newMaster;
+ }
+
+
+ private String tryElect(String clusterName, Set<String> brokers, String
oldMaster, String preferBrokerAddr) {
+ if (this.validPredicate != null) {
+ brokers = brokers.stream().filter(brokerAddr ->
this.validPredicate.test(clusterName, brokerAddr)).collect(Collectors.toSet());
+ }
+ // try to elect in brokers
+ if (brokers.size() >= 1) {
+ if (brokers.contains(oldMaster) &&
(StringUtils.isBlank(preferBrokerAddr) || preferBrokerAddr.equals(oldMaster))) {
+ // old master still valid, and our preferBrokerAddr is blank
or is equals to oldMaster
+ return oldMaster;
+ }
+ // if preferBrokerAddr is not blank, if preferBrokerAddr is valid,
we choose it, else we choose nothing
+ if (StringUtils.isNotBlank(preferBrokerAddr)) {
+ return brokers.contains(preferBrokerAddr) ? preferBrokerAddr :
null;
+ }
+ if (this.additionalInfoGetter != null) {
+ // get more information from getter
+ // sort brokerLiveInfos by epoch, maxOffset
+ TreeSet<BrokerLiveInfo> brokerLiveInfos = new
TreeSet<>(this.comparator);
+ brokers.forEach(brokerAddr ->
brokerLiveInfos.add(this.additionalInfoGetter.apply(clusterName, brokerAddr)));
+ if (brokerLiveInfos.size() >= 1) {
+ return brokerLiveInfos.first().getBrokerAddr();
+ }
+ }
+ // elect random
+ return brokers.iterator().next();
+ }
+ return null;
+ }
+
+
+ public BiFunction<String, String, BrokerLiveInfo>
getAdditionalInfoGetter() {
+ return additionalInfoGetter;
+ }
+
+ public void setAdditionalInfoGetter(BiFunction<String, String,
BrokerLiveInfo> additionalInfoGetter) {
+ this.additionalInfoGetter = additionalInfoGetter;
+ }
+
+ public BiPredicate<String, String> getValidPredicate() {
+ return validPredicate;
+ }
+
+ public void setValidPredicate(BiPredicate<String, String> validPredicate) {
+ this.validPredicate = validPredicate;
+ }
+}
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
index 3a5480140..c6a8b3345 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
@@ -24,6 +24,7 @@ import io.openmessaging.storage.dledger.MemberState;
import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -36,6 +37,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiPredicate;
import java.util.function.Supplier;
+
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -48,6 +50,8 @@ import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMaster
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
import org.apache.rocketmq.controller.Controller;
+import org.apache.rocketmq.controller.elect.ElectPolicy;
+import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
import org.apache.rocketmq.controller.impl.event.ControllerResult;
import org.apache.rocketmq.controller.impl.event.EventMessage;
import org.apache.rocketmq.controller.impl.event.EventSerializer;
@@ -77,20 +81,25 @@ public class DLedgerController implements Controller {
private final DLedgerControllerStateMachine statemachine;
// Usr for checking whether the broker is alive
private BiPredicate<String, String> brokerAlivePredicate;
+ // use for elect a master
+ private ElectPolicy electPolicy;
+
+
private AtomicBoolean isScheduling = new AtomicBoolean(false);
public DLedgerController(final ControllerConfig config, final
BiPredicate<String, String> brokerAlivePredicate) {
- this(config, brokerAlivePredicate, null, null, null);
+ this(config, brokerAlivePredicate, null, null, null, null);
}
public DLedgerController(final ControllerConfig controllerConfig,
- final BiPredicate<String, String> brokerAlivePredicate, final
NettyServerConfig nettyServerConfig,
- final NettyClientConfig nettyClientConfig, final ChannelEventListener
channelEventListener) {
+ final BiPredicate<String, String>
brokerAlivePredicate, final NettyServerConfig nettyServerConfig,
+ final NettyClientConfig nettyClientConfig, final
ChannelEventListener channelEventListener,
+ final ElectPolicy electPolicy) {
this.controllerConfig = controllerConfig;
this.eventSerializer = new EventSerializer();
this.scheduler = new EventScheduler();
this.brokerAlivePredicate = brokerAlivePredicate;
-
+ this.electPolicy = electPolicy == null ? new DefaultElectPolicy() :
electPolicy;
this.dLedgerConfig = new DLedgerConfig();
this.dLedgerConfig.setGroup(controllerConfig.getControllerDLegerGroup());
this.dLedgerConfig.setPeers(controllerConfig.getControllerDLegerPeers());
@@ -145,7 +154,7 @@ public class DLedgerController implements Controller {
@Override
public CompletableFuture<RemotingCommand>
alterSyncStateSet(AlterSyncStateSetRequestHeader request,
- final SyncStateSet syncStateSet) {
+ final
SyncStateSet syncStateSet) {
return this.scheduler.appendEvent("alterSyncStateSet",
() -> this.replicasInfoManager.alterSyncStateSet(request,
syncStateSet, this.brokerAlivePredicate), true);
}
@@ -153,7 +162,7 @@ public class DLedgerController implements Controller {
@Override
public CompletableFuture<RemotingCommand> electMaster(final
ElectMasterRequestHeader request) {
return this.scheduler.appendEvent("electMaster",
- () -> this.replicasInfoManager.electMaster(request,
this.brokerAlivePredicate), true);
+ () -> this.replicasInfoManager.electMaster(request,
this.electPolicy), true);
}
@Override
@@ -170,7 +179,6 @@ public class DLedgerController implements Controller {
@Override
public CompletableFuture<RemotingCommand> getSyncStateData(List<String>
brokerNames) {
-
return this.scheduler.appendEvent("getSyncStateData",
() -> this.replicasInfoManager.getSyncStateData(brokerNames),
false);
}
@@ -185,7 +193,7 @@ public class DLedgerController implements Controller {
sb.append(peer).append(";");
}
return
RemotingCommand.createResponseCommandWithHeader(ResponseCode.SUCCESS, new
GetMetaDataResponseHeader(
- state.getGroup(), state.getLeaderId(), state.getLeaderAddr(),
state.isLeader(), sb.toString()));
+ state.getGroup(), state.getLeaderId(), state.getLeaderAddr(),
state.isLeader(), sb.toString()));
}
@Override
@@ -220,6 +228,10 @@ public class DLedgerController implements Controller {
this.brokerAlivePredicate = brokerAlivePredicate;
}
+ public void setElectPolicy(ElectPolicy electPolicy) {
+ this.electPolicy = electPolicy;
+ }
+
/**
* Event handler that handle event
*/
@@ -276,7 +288,7 @@ public class DLedgerController implements Controller {
}
public <T> CompletableFuture<RemotingCommand> appendEvent(final String
name,
- final Supplier<ControllerResult<T>> supplier, boolean
isWriteEvent) {
+ final
Supplier<ControllerResult<T>> supplier, boolean isWriteEvent) {
if (isStopped() ||
!DLedgerController.this.roleHandler.isLeaderState()) {
final RemotingCommand command =
RemotingCommand.createResponseCommand(ResponseCode.CONTROLLER_NOT_LEADER, "The
controller is not in leader state");
final CompletableFuture<RemotingCommand> future = new
CompletableFuture<>();
@@ -313,7 +325,7 @@ public class DLedgerController implements Controller {
private final boolean isWriteEvent;
ControllerEventHandler(final String name, final
Supplier<ControllerResult<T>> supplier,
- final boolean isWriteEvent) {
+ final boolean isWriteEvent) {
this.name = name;
this.supplier = supplier;
this.future = new CompletableFuture<>();
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
index 21b3c89c9..95cc85197 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.controller.impl;
import io.netty.channel.Channel;
+
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -26,11 +27,13 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+
import org.apache.rocketmq.common.BrokerAddrInfo;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.controller.BrokerHeartbeatManager;
+import org.apache.rocketmq.controller.BrokerLiveInfo;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
@@ -68,17 +71,17 @@ public class DefaultBrokerHeartbeatManager implements
BrokerHeartbeatManager {
final Iterator<Map.Entry<BrokerAddrInfo, BrokerLiveInfo>> iterator
= this.brokerLiveTable.entrySet().iterator();
while (iterator.hasNext()) {
final Map.Entry<BrokerAddrInfo, BrokerLiveInfo> next =
iterator.next();
- long last = next.getValue().lastUpdateTimestamp;
- long timeoutMillis = next.getValue().heartbeatTimeoutMillis;
+ long last = next.getValue().getLastUpdateTimestamp();
+ long timeoutMillis =
next.getValue().getHeartbeatTimeoutMillis();
if ((last + timeoutMillis) < System.currentTimeMillis()) {
- final Channel channel = next.getValue().channel;
+ final Channel channel = next.getValue().getChannel();
iterator.remove();
if (channel != null) {
RemotingUtil.closeChannel(channel);
}
this.executor.submit(() ->
- notifyBrokerInActive(next.getKey().getClusterName(),
next.getValue().brokerName, next.getKey().getBrokerAddr(),
next.getValue().brokerId));
- log.warn("The broker channel {} expired, brokerInfo {},
expired {}ms", next.getValue().channel, next.getKey(), timeoutMillis);
+
notifyBrokerInActive(next.getKey().getClusterName(),
next.getValue().getBrokerName(), next.getKey().getBrokerAddr(),
next.getValue().getBrokerId()));
+ log.warn("The broker channel {} expired, brokerInfo {},
expired {}ms", next.getValue().getChannel(), next.getKey(), timeoutMillis);
}
}
} catch (Exception e) {
@@ -99,14 +102,15 @@ public class DefaultBrokerHeartbeatManager implements
BrokerHeartbeatManager {
@Override
public void registerBroker(String clusterName, String brokerName, String
brokerAddr,
- long brokerId, Long timeoutMillis, Channel channel) {
+ long brokerId, Long timeoutMillis, Channel
channel, Integer epoch, Long maxOffset) {
final BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName,
brokerAddr);
final BrokerLiveInfo prevBrokerLiveInfo =
this.brokerLiveTable.put(addrInfo,
- new BrokerLiveInfo(brokerName,
- brokerId,
- System.currentTimeMillis(),
- timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME :
timeoutMillis,
- channel));
+ new BrokerLiveInfo(brokerName,
+ brokerAddr,
+ brokerId,
+ System.currentTimeMillis(),
+ timeoutMillis == null ?
DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
+ channel, epoch == null ? -1 : epoch, maxOffset == null
? -1 : maxOffset));
if (prevBrokerLiveInfo == null) {
log.info("new broker registered, {}, brokerId:{}", addrInfo,
brokerId);
}
@@ -117,17 +121,30 @@ public class DefaultBrokerHeartbeatManager implements
BrokerHeartbeatManager {
BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
BrokerLiveInfo prev = this.brokerLiveTable.get(addrInfo);
if (prev != null) {
- prev.brokerId = brokerId;
+ prev.setBrokerId(brokerId);
log.info("Change broker {}'s brokerId to {}", brokerAddr,
brokerId);
}
}
@Override
- public void onBrokerHeartbeat(String clusterName, String brokerAddr) {
+ public void onBrokerHeartbeat(String clusterName, String brokerAddr,
Integer epoch, Long maxOffset, Long confirmOffset) {
BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
BrokerLiveInfo prev = this.brokerLiveTable.get(addrInfo);
+ int realEpoch = epoch == null ? -1 : epoch;
+ long realMaxOffset = maxOffset == null ? -1 : maxOffset;
+ long realConfirmOffset = confirmOffset == null ? -1 : confirmOffset;
if (prev != null) {
- prev.lastUpdateTimestamp = System.currentTimeMillis();
+ prev.setLastUpdateTimestamp(System.currentTimeMillis());
+ if (realEpoch > prev.getEpoch()) {
+ prev.setEpoch(realEpoch);
+ prev.setMaxOffset(realMaxOffset);
+ prev.setConfirmOffset(realConfirmOffset);
+ } else if (realEpoch == prev.getEpoch()) {
+ if (realMaxOffset > prev.getMaxOffset()) {
+ prev.setMaxOffset(realMaxOffset);
+ prev.setConfirmOffset(realConfirmOffset);
+ }
+ }
}
}
@@ -135,11 +152,11 @@ public class DefaultBrokerHeartbeatManager implements
BrokerHeartbeatManager {
public void onBrokerChannelClose(Channel channel) {
BrokerAddrInfo addrInfo = null;
for (Map.Entry<BrokerAddrInfo, BrokerLiveInfo> entry :
this.brokerLiveTable.entrySet()) {
- if (entry.getValue().channel == channel) {
- log.info("Channel {} inactive, broker {}, addr:{}, id:{}",
entry.getValue().channel, entry.getValue().brokerName,
entry.getKey().getBrokerAddr(), entry.getValue().brokerId);
+ if (entry.getValue().getChannel() == channel) {
+ log.info("Channel {} inactive, broker {}, addr:{}, id:{}",
entry.getValue().getChannel(), entry.getValue().getBrokerName(),
entry.getKey().getBrokerAddr(), entry.getValue().getBrokerId());
addrInfo = entry.getKey();
this.executor.submit(() ->
- notifyBrokerInActive(entry.getKey().getClusterName(),
entry.getValue().brokerName, entry.getKey().getBrokerAddr(),
entry.getValue().brokerId));
+ notifyBrokerInActive(entry.getKey().getClusterName(),
entry.getValue().getBrokerName(), entry.getKey().getBrokerAddr(),
entry.getValue().getBrokerId()));
break;
}
}
@@ -148,42 +165,20 @@ public class DefaultBrokerHeartbeatManager implements
BrokerHeartbeatManager {
}
}
+ @Override
+ public BrokerLiveInfo getBrokerLiveInfo(String clusterName, String
brokerAddr) {
+ return this.brokerLiveTable.get(new BrokerAddrInfo(clusterName,
brokerAddr));
+ }
+
@Override
public boolean isBrokerActive(String clusterName, String brokerAddr) {
final BrokerLiveInfo info = this.brokerLiveTable.get(new
BrokerAddrInfo(clusterName, brokerAddr));
if (info != null) {
- long last = info.lastUpdateTimestamp;
- long timeoutMillis = info.heartbeatTimeoutMillis;
+ long last = info.getLastUpdateTimestamp();
+ long timeoutMillis = info.getHeartbeatTimeoutMillis();
return (last + timeoutMillis) >= System.currentTimeMillis();
}
return false;
}
- static class BrokerLiveInfo {
- private final String brokerName;
- private final long heartbeatTimeoutMillis;
- private final Channel channel;
- private long brokerId;
- private long lastUpdateTimestamp;
-
- public BrokerLiveInfo(String brokerName, long brokerId, long
lastUpdateTimestamp, long heartbeatTimeoutMillis,
- Channel channel) {
- this.brokerName = brokerName;
- this.brokerId = brokerId;
- this.lastUpdateTimestamp = lastUpdateTimestamp;
- this.heartbeatTimeoutMillis = heartbeatTimeoutMillis;
- this.channel = channel;
- }
-
- @Override
- public String toString() {
- return "BrokerLiveInfo{" +
- "brokerName='" + brokerName + '\'' +
- ", brokerId=" + brokerId +
- ", lastUpdateTimestamp=" + lastUpdateTimestamp +
- ", heartbeatTimeoutMillis=" + heartbeatTimeoutMillis +
- ", channel=" + channel +
- '}';
- }
- }
}
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index c63ca0bf1..ed123aeb2 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -23,7 +23,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiPredicate;
-import java.util.function.Predicate;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
@@ -41,6 +40,7 @@ import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMaster
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.controller.elect.ElectPolicy;
import org.apache.rocketmq.controller.impl.event.AlterSyncStateSetEvent;
import org.apache.rocketmq.controller.impl.event.ApplyBrokerIdEvent;
import org.apache.rocketmq.controller.impl.event.ControllerResult;
@@ -68,8 +68,8 @@ public class ReplicasInfoManager {
}
public ControllerResult<AlterSyncStateSetResponseHeader> alterSyncStateSet(
- final AlterSyncStateSetRequestHeader request, final SyncStateSet
syncStateSet,
- final BiPredicate<String, String> brokerAlivePredicate) {
+ final AlterSyncStateSetRequestHeader request, final SyncStateSet
syncStateSet,
+ final BiPredicate<String, String> brokerAlivePredicate) {
final String brokerName = request.getBrokerName();
final ControllerResult<AlterSyncStateSetResponseHeader> result = new
ControllerResult<>(new AlterSyncStateSetResponseHeader());
final AlterSyncStateSetResponseHeader response = result.getResponse();
@@ -91,7 +91,7 @@ public class ReplicasInfoManager {
// Check master
if
(!syncStateInfo.getMasterAddress().equals(request.getMasterAddress())) {
String err = String.format("Rejecting alter syncStateSet
request because the current leader is:{%s}, not {%s}",
- syncStateInfo.getMasterAddress(),
request.getMasterAddress());
+ syncStateInfo.getMasterAddress(),
request.getMasterAddress());
log.error("{}", err);
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_MASTER, err);
return result;
@@ -100,7 +100,7 @@ public class ReplicasInfoManager {
// Check master epoch
if (request.getMasterEpoch() != syncStateInfo.getMasterEpoch()) {
String err = String.format("Rejecting alter syncStateSet
request because the current master epoch is:{%d}, not {%d}",
- syncStateInfo.getMasterEpoch(),
request.getMasterEpoch());
+ syncStateInfo.getMasterEpoch(), request.getMasterEpoch());
log.error("{}", err);
result.setCodeAndRemark(ResponseCode.CONTROLLER_FENCED_MASTER_EPOCH, err);
return result;
@@ -109,7 +109,7 @@ public class ReplicasInfoManager {
// Check syncStateSet epoch
if (syncStateSet.getSyncStateSetEpoch() !=
syncStateInfo.getSyncStateSetEpoch()) {
String err = String.format("Rejecting alter syncStateSet
request because the current syncStateSet epoch is:{%d}, not {%d}",
- syncStateInfo.getSyncStateSetEpoch(),
syncStateSet.getSyncStateSetEpoch());
+ syncStateInfo.getSyncStateSetEpoch(),
syncStateSet.getSyncStateSetEpoch());
log.error("{}", err);
result.setCodeAndRemark(ResponseCode.CONTROLLER_FENCED_SYNC_STATE_SET_EPOCH,
err);
return result;
@@ -150,8 +150,7 @@ public class ReplicasInfoManager {
return result;
}
- public ControllerResult<ElectMasterResponseHeader> electMaster(
- final ElectMasterRequestHeader request, final BiPredicate<String,
String> brokerAlivePredicate) {
+ public ControllerResult<ElectMasterResponseHeader> electMaster(final
ElectMasterRequestHeader request, final ElectPolicy electPolicy) {
final String brokerName = request.getBrokerName();
final String assignBrokerAddress = request.getBrokerAddress();
final ControllerResult<ElectMasterResponseHeader> result = new
ControllerResult<>(new ElectMasterResponseHeader());
@@ -159,87 +158,24 @@ public class ReplicasInfoManager {
final SyncStateInfo syncStateInfo =
this.syncStateSetInfoTable.get(brokerName);
final BrokerInfo brokerInfo =
this.replicaInfoTable.get(brokerName);
final Set<String> syncStateSet = syncStateInfo.getSyncStateSet();
- // First, check whether the master is still active
final String oldMaster = syncStateInfo.getMasterAddress();
- if (StringUtils.isNoneEmpty(oldMaster) &&
brokerAlivePredicate.test(brokerInfo.getClusterName(), oldMaster)) {
+ Set<String> allReplicaBrokers =
controllerConfig.isEnableElectUncleanMaster() ? brokerInfo.getAllBroker() :
null;
- if (StringUtils.isBlank(assignBrokerAddress)) {
- String err = String.format("The old master %s is still
alive, no need to elect new master for broker %s", oldMaster,
brokerInfo.getBrokerName());
- log.warn("{}", err);
-
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REQUEST, err);
- return result;
- }
-
- if (StringUtils.equals(oldMaster, assignBrokerAddress)) {
- String err = String.format("The Re-elect master is the
same as the old master %s which is still alive, no need to elect new master for
broker %s", oldMaster, brokerInfo.getBrokerName());
- log.warn("{}", err);
-
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REQUEST, err);
- return result;
- }
- }
-
- // Try elect a master in syncStateSet
- if (syncStateSet.size() > 1) {
- boolean electSuccess = tryElectMaster(result, brokerName,
assignBrokerAddress, syncStateSet, candidate ->
- !candidate.equals(syncStateInfo.getMasterAddress()) &&
brokerAlivePredicate.test(brokerInfo.getClusterName(), candidate));
- if (electSuccess) {
- return result;
- }
- }
-
- // Try elect a master in lagging replicas if
enableElectUncleanMaster = true
- if (controllerConfig.isEnableElectUncleanMaster()) {
- boolean electSuccess = tryElectMaster(result, brokerName,
assignBrokerAddress, brokerInfo.getAllBroker(), candidate ->
- !candidate.equals(syncStateInfo.getMasterAddress()) &&
brokerAlivePredicate.test(brokerInfo.getClusterName(), candidate));
- if (electSuccess) {
- return result;
- }
- }
-
- // If elect failed, we still need to apply an ElectMasterEvent to
tell the statemachine
- // that the master was shutdown and no new master was elected.
- final ElectMasterEvent event = new ElectMasterEvent(false,
brokerName);
- result.addEvent(event);
-
result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE, "Failed
to elect a new broker master");
- return result;
- }
- result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REQUEST,
"Broker metadata is not existed");
- return result;
- }
-
- /**
- * Try elect a new master in candidates
- *
- * @param filter return true if the candidate is available
- * @return true if elect success
- */
- private boolean tryElectMaster(final
ControllerResult<ElectMasterResponseHeader> result, final String brokerName,
- final String assignBrokerAddress, final
Set<String> candidates, final Predicate<String> filter) {
- final int masterEpoch =
this.syncStateSetInfoTable.get(brokerName).getMasterEpoch();
- final int syncStateSetEpoch =
this.syncStateSetInfoTable.get(brokerName).getSyncStateSetEpoch();
-
- //Re-elect the assigned broker as master
- if (StringUtils.isNotBlank(assignBrokerAddress) &&
filter.test(assignBrokerAddress)) {
- final ElectMasterResponseHeader response = result.getResponse();
- response.setNewMasterAddress(assignBrokerAddress);
- response.setMasterEpoch(masterEpoch + 1);
- response.setSyncStateSetEpoch(syncStateSetEpoch);
- BrokerMemberGroup brokerMemberGroup =
buildBrokerMemberGroup(brokerName);
- if (null != brokerMemberGroup) {
- response.setBrokerMemberGroup(brokerMemberGroup);
- result.setBody(brokerMemberGroup.encode());
+ // elect by policy
+ String newMaster = electPolicy.elect(brokerInfo.getClusterName(),
syncStateSet, allReplicaBrokers, oldMaster, assignBrokerAddress);
+ if (StringUtils.isNotEmpty(newMaster) &&
newMaster.equals(oldMaster)) {
+ // old master still valid, change nothing
+ String err = String.format("The old master %s is still alive,
not need to elect new master for broker %s", oldMaster,
brokerInfo.getBrokerName());
+ log.warn("{}", err);
+
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REQUEST, err);
+ return result;
}
- final ElectMasterEvent event = new ElectMasterEvent(brokerName,
assignBrokerAddress);
- result.addEvent(event);
- return true;
- } else if (StringUtils.isNotBlank(assignBrokerAddress) &&
!filter.test(assignBrokerAddress)) {
- return false;
- }
-
- for (final String candidate : candidates) {
- if (filter.test(candidate)) {
+ // a new master is elected
+ if (StringUtils.isNotEmpty(newMaster)) {
+ final int masterEpoch =
this.syncStateSetInfoTable.get(brokerName).getMasterEpoch();
+ final int syncStateSetEpoch =
this.syncStateSetInfoTable.get(brokerName).getSyncStateSetEpoch();
final ElectMasterResponseHeader response =
result.getResponse();
- response.setNewMasterAddress(candidate);
+ response.setNewMasterAddress(newMaster);
response.setMasterEpoch(masterEpoch + 1);
response.setSyncStateSetEpoch(syncStateSetEpoch);
BrokerMemberGroup brokerMemberGroup =
buildBrokerMemberGroup(brokerName);
@@ -247,12 +183,19 @@ public class ReplicasInfoManager {
response.setBrokerMemberGroup(brokerMemberGroup);
result.setBody(brokerMemberGroup.encode());
}
- final ElectMasterEvent event = new
ElectMasterEvent(brokerName, candidate);
+ final ElectMasterEvent event = new
ElectMasterEvent(brokerName, newMaster);
result.addEvent(event);
- return true;
+ return result;
}
+ // If elect failed, we still need to apply an ElectMasterEvent to
tell the statemachine
+ // that the master was shutdown and no new master was elected.
+ final ElectMasterEvent event = new ElectMasterEvent(false,
brokerName);
+ result.addEvent(event);
+
result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE, "Failed
to elect a new broker master");
+ return result;
}
- return false;
+ result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REQUEST,
"Broker metadata is not existed");
+ return result;
}
private BrokerMemberGroup buildBrokerMemberGroup(final String brokerName) {
@@ -269,7 +212,7 @@ public class ReplicasInfoManager {
}
public ControllerResult<RegisterBrokerToControllerResponseHeader>
registerBroker(
- final RegisterBrokerToControllerRequestHeader request) {
+ final RegisterBrokerToControllerRequestHeader request) {
final String brokerName = request.getBrokerName();
final String brokerAddress = request.getBrokerAddress();
final ControllerResult<RegisterBrokerToControllerResponseHeader>
result = new ControllerResult<>(new RegisterBrokerToControllerResponseHeader());
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
index 5b583b493..95d4c2b10 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
@@ -114,7 +114,7 @@ public class ControllerRequestProcessor implements
NettyRequestProcessor {
final RegisterBrokerToControllerResponseHeader
responseHeader = (RegisterBrokerToControllerResponseHeader)
response.readCustomHeader();
if (responseHeader != null && responseHeader.getBrokerId()
>= 0) {
this.heartbeatManager.registerBroker(controllerRequest.getClusterName(),
controllerRequest.getBrokerName(), controllerRequest.getBrokerAddress(),
- responseHeader.getBrokerId(),
controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel());
+ responseHeader.getBrokerId(),
controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel(),
controllerRequest.getEpoch(), controllerRequest.getMaxOffset());
}
return response;
}
@@ -133,7 +133,8 @@ public class ControllerRequestProcessor implements
NettyRequestProcessor {
}
case BROKER_HEARTBEAT: {
final BrokerHeartbeatRequestHeader requestHeader =
(BrokerHeartbeatRequestHeader)
request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
-
this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(),
requestHeader.getBrokerAddr());
+
this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
+ requestHeader.getEpoch(),
requestHeader.getMaxOffset(), requestHeader.getConfirmOffset());
return
RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat
success");
}
case CONTROLLER_GET_SYNC_STATE_DATA: {
diff --git
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
index 83a936c49..2ea427854 100644
---
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
+++
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
@@ -153,7 +153,7 @@ public class ControllerManagerTest {
// Send heartbeat for broker2
ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
- executor.scheduleAtFixedRate(()-> {
+ executor.scheduleAtFixedRate(() -> {
final BrokerHeartbeatRequestHeader heartbeatRequestHeader = new
BrokerHeartbeatRequestHeader();
heartbeatRequestHeader.setClusterName("cluster1");
heartbeatRequestHeader.setBrokerName("broker1");
diff --git
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
index 146fd9352..dce3167ef 100644
---
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
+++
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
@@ -35,6 +35,7 @@ import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMaster
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
import org.apache.rocketmq.controller.Controller;
+import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
import org.apache.rocketmq.controller.impl.DLedgerController;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
@@ -175,11 +176,22 @@ public class DLedgerControllerTest {
});
}
+ public void setBrokerElectPolicy(DLedgerController controller, String...
deathBroker) {
+ controller.setElectPolicy(new DefaultElectPolicy((clusterName,
brokerAddress) -> {
+ for (String broker : deathBroker) {
+ if (broker.equals(brokerAddress)) {
+ return false;
+ }
+ }
+ return true;
+ }, null));
+ }
+
@Test
public void testElectMaster() throws Exception {
final DLedgerController leader = mockMetaData(false);
final ElectMasterRequestHeader request = new
ElectMasterRequestHeader("broker1");
- setBrokerAlivePredicate(leader, "127.0.0.1:9000");
+ setBrokerElectPolicy(leader, "127.0.0.1:9000");
final RemotingCommand resp = leader.electMaster(request).get(10,
TimeUnit.SECONDS);
final ElectMasterResponseHeader response = (ElectMasterResponseHeader)
resp.readCustomHeader();
assertEquals(response.getMasterEpoch(), 2);
@@ -198,7 +210,7 @@ public class DLedgerControllerTest {
// Now we trigger electMaster api, which means the old master is
shutdown and want to elect a new master.
// However, the syncStateSet in statemachine is {"127.0.0.1:9000"},
not more replicas can be elected as master, it will be failed.
final ElectMasterRequestHeader electRequest = new
ElectMasterRequestHeader("broker1");
- setBrokerAlivePredicate(leader, "127.0.0.1:9000");
+ setBrokerElectPolicy(leader, "127.0.0.1:9000");
leader.electMaster(electRequest).get(10, TimeUnit.SECONDS);
final RemotingCommand resp = leader.getReplicaInfo(new
GetReplicaInfoRequestHeader("broker1")).
@@ -238,7 +250,7 @@ public class DLedgerControllerTest {
// However, event if the syncStateSet in statemachine is
{"127.0.0.1:9000"}
// the option {enableElectUncleanMaster = true}, so the controller
sill can elect a new master
final ElectMasterRequestHeader electRequest = new
ElectMasterRequestHeader("broker1");
- setBrokerAlivePredicate(leader, "127.0.0.1:9000");
+ setBrokerElectPolicy(leader, "127.0.0.1:9000");
final CompletableFuture<RemotingCommand> future =
leader.electMaster(electRequest);
future.get(10, TimeUnit.SECONDS);
@@ -246,7 +258,7 @@ public class DLedgerControllerTest {
final GetReplicaInfoResponseHeader replicaInfo =
(GetReplicaInfoResponseHeader) resp.readCustomHeader();
final SyncStateSet syncStateSet =
RemotingSerializable.decode(resp.getBody(), SyncStateSet.class);
- final HashSet<String> newSyncStateSet2 = new HashSet<>();
+ final HashSet<String> newSyncStateSet2 = new HashSet<>();
newSyncStateSet2.add(replicaInfo.getMasterAddress());
assertEquals(syncStateSet.getSyncStateSet(), newSyncStateSet2);
assertNotEquals(replicaInfo.getMasterAddress(), "");
diff --git
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
index 5565e59bd..0f106cd71 100644
---
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
+++
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
@@ -44,7 +44,7 @@ public class DefaultBrokerHeartbeatManagerTest {
System.out.println("Broker shutdown:" + brokerAddress);
latch.countDown();
});
- this.heartbeatManager.registerBroker("cluster1", "broker1",
"127.0.0.1:7000", 1L, 3000L, null);
+ this.heartbeatManager.registerBroker("cluster1", "broker1",
"127.0.0.1:7000", 1L, 3000L, null, 1, 1L);
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
this.heartbeatManager.shutdown();
}
diff --git
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
index ed47be2df..915e383b4 100644
---
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
+++
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
@@ -19,6 +19,7 @@ package
org.apache.rocketmq.controller.impl.controller.impl.manager;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.SyncStateSet;
@@ -30,11 +31,15 @@ import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMaster
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.controller.elect.ElectPolicy;
+import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
+import org.apache.rocketmq.controller.impl.DefaultBrokerHeartbeatManager;
import org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager;
import org.apache.rocketmq.controller.impl.event.ControllerResult;
import org.apache.rocketmq.controller.impl.event.ElectMasterEvent;
import org.apache.rocketmq.controller.impl.event.EventMessage;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -47,18 +52,30 @@ import static org.junit.Assert.assertTrue;
public class ReplicasInfoManagerTest {
private ReplicasInfoManager replicasInfoManager;
+ private DefaultBrokerHeartbeatManager heartbeatManager;
+
@Before
public void init() {
final ControllerConfig config = new ControllerConfig();
config.setEnableElectUncleanMaster(false);
+ config.setScanNotActiveBrokerInterval(300000000);
this.replicasInfoManager = new ReplicasInfoManager(config);
+ this.heartbeatManager = new DefaultBrokerHeartbeatManager(config);
+ this.heartbeatManager.start();
+ }
+
+ @After
+ public void destroy() {
+ this.replicasInfoManager = null;
+ this.heartbeatManager.shutdown();
+ this.heartbeatManager = null;
}
public boolean registerNewBroker(String clusterName, String brokerName,
String brokerAddress,
boolean isFirstRegisteredBroker) {
// Register new broker
final RegisterBrokerToControllerRequestHeader registerRequest =
- new RegisterBrokerToControllerRequestHeader(clusterName,
brokerName, brokerAddress);
+ new RegisterBrokerToControllerRequestHeader(clusterName,
brokerName, brokerAddress);
final ControllerResult<RegisterBrokerToControllerResponseHeader>
registerResult = this.replicasInfoManager.registerBroker(registerRequest);
apply(registerResult.getEvents());
@@ -77,7 +94,7 @@ public class ReplicasInfoManagerTest {
private boolean alterNewInSyncSet(String brokerName, String masterAddress,
int masterEpoch,
Set<String> newSyncStateSet, int syncStateSetEpoch) {
final AlterSyncStateSetRequestHeader alterRequest =
- new AlterSyncStateSetRequestHeader(brokerName, masterAddress,
masterEpoch);
+ new AlterSyncStateSetRequestHeader(brokerName, masterAddress,
masterEpoch);
final ControllerResult<AlterSyncStateSetResponseHeader> result =
this.replicasInfoManager.alterSyncStateSet(alterRequest, new
SyncStateSet(newSyncStateSet, syncStateSetEpoch), (va1, va2) -> true);
apply(result.getEvents());
@@ -107,11 +124,84 @@ public class ReplicasInfoManagerTest {
assertTrue(alterNewInSyncSet("broker1", "127.0.0.1:9000", 1,
newSyncStateSet, 1));
}
+ public void mockHeartbeatDataMasterStillAlive() {
+ this.heartbeatManager.registerBroker("cluster1", "broker1",
"127.0.0.1:9000", 1L, 10000000000L, null,
+ 1, 3L);
+ this.heartbeatManager.registerBroker("cluster1", "broker1",
"127.0.0.1:9001", 1L, 10000000000L, null,
+ 1, 2L);
+ this.heartbeatManager.registerBroker("cluster1", "broker1",
"127.0.0.1:9002", 1L, 10000000000L, null,
+ 1, 3L);
+ }
+
+ public void mockHeartbeatDataHigherEpoch() {
+ this.heartbeatManager.registerBroker("cluster1", "broker1",
"127.0.0.1:9000", 1L, -10000L, null,
+ 1, 3L);
+ this.heartbeatManager.registerBroker("cluster1", "broker1",
"127.0.0.1:9001", 1L, 10000000000L, null,
+ 1, 2L);
+ this.heartbeatManager.registerBroker("cluster1", "broker1",
"127.0.0.1:9002", 1L, 10000000000L, null,
+ 0, 3L);
+ }
+
+
+ public void mockHeartbeatDataHigherOffset() {
+ this.heartbeatManager.registerBroker("cluster1", "broker1",
"127.0.0.1:9000", 1L, -10000L, null,
+ 1, 3L);
+ this.heartbeatManager.registerBroker("cluster1", "broker1",
"127.0.0.1:9001", 1L, 10000000000L, null,
+ 1, 2L);
+ this.heartbeatManager.registerBroker("cluster1", "broker1",
"127.0.0.1:9002", 1L, 10000000000L, null,
+ 1, 3L);
+ }
+
+ @Test
+ public void testElectMasterOldMasterStillAlive() {
+ mockMetaData();
+ final ElectMasterRequestHeader request = new
ElectMasterRequestHeader("broker1");
+ ElectPolicy electPolicy = new
DefaultElectPolicy(this.heartbeatManager::isBrokerActive,
this.heartbeatManager::getBrokerLiveInfo);
+ mockHeartbeatDataMasterStillAlive();
+ final ControllerResult<ElectMasterResponseHeader> cResult =
this.replicasInfoManager.electMaster(request,
+ electPolicy);
+ assertEquals(ResponseCode.CONTROLLER_INVALID_REQUEST,
cResult.getResponseCode());
+ }
+
+ @Test
+ public void testElectMasterPreferHigherEpoch() {
+ mockMetaData();
+ final ElectMasterRequestHeader request = new
ElectMasterRequestHeader("broker1");
+ ElectPolicy electPolicy = new
DefaultElectPolicy(this.heartbeatManager::isBrokerActive,
this.heartbeatManager::getBrokerLiveInfo);
+ mockHeartbeatDataHigherEpoch();
+ final ControllerResult<ElectMasterResponseHeader> cResult =
this.replicasInfoManager.electMaster(request,
+ electPolicy);
+ System.out.println(cResult.getResponseCode());
+ final ElectMasterResponseHeader response = cResult.getResponse();
+ System.out.println(response);
+ assertEquals(response.getMasterEpoch(), 2);
+ assertFalse(response.getNewMasterAddress().isEmpty());
+ assertEquals("127.0.0.1:9001", response.getNewMasterAddress());
+ }
+
+ @Test
+ public void testElectMasterPreferHigherOffsetWhenEpochEquals() {
+ mockMetaData();
+ final ElectMasterRequestHeader request = new
ElectMasterRequestHeader("broker1");
+ ElectPolicy electPolicy = new
DefaultElectPolicy(this.heartbeatManager::isBrokerActive,
this.heartbeatManager::getBrokerLiveInfo);
+ mockHeartbeatDataHigherOffset();
+ final ControllerResult<ElectMasterResponseHeader> cResult =
this.replicasInfoManager.electMaster(request,
+ electPolicy);
+ System.out.println(cResult.getResponseCode());
+ final ElectMasterResponseHeader response = cResult.getResponse();
+ System.out.println(response);
+ assertEquals(response.getMasterEpoch(), 2);
+ assertFalse(response.getNewMasterAddress().isEmpty());
+ assertEquals("127.0.0.1:9002", response.getNewMasterAddress());
+ }
+
+
@Test
public void testElectMaster() {
mockMetaData();
final ElectMasterRequestHeader request = new
ElectMasterRequestHeader("broker1");
- final ControllerResult<ElectMasterResponseHeader> cResult =
this.replicasInfoManager.electMaster(request, (clusterName, brokerAddress) ->
!brokerAddress.equals("127.0.0.1:9000"));
+ final ControllerResult<ElectMasterResponseHeader> cResult =
this.replicasInfoManager.electMaster(request,
+ new DefaultElectPolicy((clusterName, brokerAddress) ->
!brokerAddress.equals("127.0.0.1:9000"), null));
final ElectMasterResponseHeader response = cResult.getResponse();
assertEquals(response.getMasterEpoch(), 2);
assertFalse(response.getNewMasterAddress().isEmpty());
@@ -121,20 +211,22 @@ public class ReplicasInfoManagerTest {
brokerSet.add("127.0.0.1:9000");
brokerSet.add("127.0.0.1:9001");
brokerSet.add("127.0.0.1:9002");
- final ElectMasterRequestHeader assignRequest = new
ElectMasterRequestHeader("cluster1","broker1", "127.0.0.1:9000");
- final ControllerResult<ElectMasterResponseHeader> cResult1 =
this.replicasInfoManager.electMaster(assignRequest, (clusterName,
brokerAddress) -> brokerAddress.contains("127.0.0.1:9000"));
- assertEquals( cResult1.getResponseCode(),
ResponseCode.CONTROLLER_INVALID_REQUEST);
-
+ final ElectMasterRequestHeader assignRequest = new
ElectMasterRequestHeader("cluster1", "broker1", "127.0.0.1:9000");
+ final ControllerResult<ElectMasterResponseHeader> cResult1 =
this.replicasInfoManager.electMaster(assignRequest,
+ new DefaultElectPolicy((clusterName, brokerAddress) ->
brokerAddress.contains("127.0.0.1:9000"), null));
+ assertEquals(cResult1.getResponseCode(),
ResponseCode.CONTROLLER_INVALID_REQUEST);
- final ElectMasterRequestHeader assignRequest1 = new
ElectMasterRequestHeader("cluster1","broker1", "127.0.0.1:9001");
- final ControllerResult<ElectMasterResponseHeader> cResult2 =
this.replicasInfoManager.electMaster(assignRequest1, (clusterName,
brokerAddress) -> brokerAddress.equals("127.0.0.1:9000"));
- assertEquals( cResult2.getResponseCode(),
ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE);
+ final ElectMasterRequestHeader assignRequest1 = new
ElectMasterRequestHeader("cluster1", "broker1", "127.0.0.1:9001");
+ final ControllerResult<ElectMasterResponseHeader> cResult2 =
this.replicasInfoManager.electMaster(assignRequest1,
+ new DefaultElectPolicy((clusterName, brokerAddress) ->
brokerAddress.equals("127.0.0.1:9000"), null));
+ assertEquals(cResult2.getResponseCode(),
ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE);
- final ElectMasterRequestHeader assignRequest2 = new
ElectMasterRequestHeader("cluster1","broker1", "127.0.0.1:9001");
- final ControllerResult<ElectMasterResponseHeader> cResult3 =
this.replicasInfoManager.electMaster(assignRequest2, (clusterName,
brokerAddress) -> !brokerAddress.equals("127.0.0.1:9000"));
- assertEquals( cResult3.getResponseCode(), ResponseCode.SUCCESS);
+ final ElectMasterRequestHeader assignRequest2 = new
ElectMasterRequestHeader("cluster1", "broker1", "127.0.0.1:9001");
+ final ControllerResult<ElectMasterResponseHeader> cResult3 =
this.replicasInfoManager.electMaster(assignRequest2,
+ new DefaultElectPolicy((clusterName, brokerAddress) ->
!brokerAddress.equals("127.0.0.1:9000"), null));
+ assertEquals(cResult3.getResponseCode(), ResponseCode.SUCCESS);
final ElectMasterResponseHeader response3 = cResult3.getResponse();
- assertEquals(response3.getNewMasterAddress(),"127.0.0.1:9001");
+ assertEquals(response3.getNewMasterAddress(), "127.0.0.1:9001");
assertEquals(response.getMasterEpoch(), 2);
assertFalse(response.getNewMasterAddress().isEmpty());
assertNotEquals(response.getNewMasterAddress(), "127.0.0.1:9000");
@@ -151,7 +243,8 @@ public class ReplicasInfoManagerTest {
// Now we trigger electMaster api, which means the old master is
shutdown and want to elect a new master.
// However, the syncStateSet in statemachine is {"127.0.0.1:9000"},
not more replicas can be elected as master, it will be failed.
final ElectMasterRequestHeader electRequest = new
ElectMasterRequestHeader("broker1");
- final ControllerResult<ElectMasterResponseHeader> cResult =
this.replicasInfoManager.electMaster(electRequest, (clusterName, brokerAddress)
-> !brokerAddress.equals("127.0.0.1:9000"));
+ final ControllerResult<ElectMasterResponseHeader> cResult =
this.replicasInfoManager.electMaster(electRequest,
+ new DefaultElectPolicy((clusterName, brokerAddress) ->
!brokerAddress.equals("127.0.0.1:9000"), null));
final List<EventMessage> events = cResult.getEvents();
assertEquals(events.size(), 1);
final ElectMasterEvent event = (ElectMasterEvent) events.get(0);
@@ -164,4 +257,4 @@ public class ReplicasInfoManagerTest {
assertEquals(replicaInfo.getMasterEpoch(), 2);
}
-}
\ No newline at end of file
+}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 2ea22dc6d..0814e24ef 100644
---
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -392,6 +392,10 @@ public class AutoSwitchHAService extends DefaultHAService {
return reputFromOffset;
}
+ public int getLastEpoch() {
+ return this.epochCache.lastEpoch();
+ }
+
public List<EpochEntry> getEpochEntries() {
return this.epochCache.getAllEntries();
}