This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 06ca14a50 [ISSUE #4791] Add elect master command for admin CLI (#4798)
06ca14a50 is described below
commit 06ca14a5081d067a1e61b5d716d3f17c16ef00d0
Author: mxsm <[email protected]>
AuthorDate: Thu Aug 11 10:06:46 2022 +0800
[ISSUE #4791] Add elect master command for admin CLI (#4798)
---
.../rocketmq/client/impl/MQClientAPIImpl.java | 31 ++++++++
.../controller/ElectMasterRequestHeader.java | 33 ++++++++
.../impl/manager/ReplicasInfoManager.java | 64 +++++++++++----
.../processor/ControllerRequestProcessor.java | 33 +++++---
.../impl/manager/ReplicasInfoManagerTest.java | 30 ++++++-
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 20 +++--
.../tools/admin/DefaultMQAdminExtImpl.java | 10 ++-
.../apache/rocketmq/tools/admin/MQAdminExt.java | 70 ++++++++++------
.../rocketmq/tools/command/MQAdminStartup.java | 2 +
.../controller/ReElectMasterSubCommand.java | 92 ++++++++++++++++++++++
10 files changed, 330 insertions(+), 55 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index c425dba29..7719b0ae6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -79,6 +79,7 @@ import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.BrokerMemberGroup;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody;
import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
@@ -185,6 +186,8 @@ import
org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHea
import
org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
+import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
@@ -3026,4 +3029,32 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
throw new MQClientException(errResponse.getCode(),
errResponse.getRemark());
}
}
+
+ public ElectMasterResponseHeader electMaster(String controllerAddr, String
clusterName, String brokerName,
+ String brokerAddr) throws MQBrokerException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
RemotingCommandException {
+
+ //get controller leader address
+ final GetMetaDataResponseHeader controllerMetaData =
this.getControllerMetaData(controllerAddr);
+ assert controllerMetaData != null;
+ assert controllerMetaData.getControllerLeaderAddress() != null;
+ final String leaderAddress =
controllerMetaData.getControllerLeaderAddress();
+ ElectMasterRequestHeader electRequestHeader = new
ElectMasterRequestHeader(clusterName, brokerName, brokerAddr);
+
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ELECT_MASTER,
electRequestHeader);
+ final RemotingCommand response =
this.remotingClient.invokeSync(leaderAddress, request, 3000);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ BrokerMemberGroup brokerMemberGroup =
RemotingSerializable.decode(response.getBody(), BrokerMemberGroup.class);
+ ElectMasterResponseHeader responseHeader =
(ElectMasterResponseHeader)
response.decodeCommandCustomHeader(ElectMasterResponseHeader.class);
+ if (null != responseHeader) {
+ responseHeader.setBrokerMemberGroup(brokerMemberGroup);
+ }
+ return responseHeader;
+ }
+ default:
+ break;
+ }
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ElectMasterRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ElectMasterRequestHeader.java
index d96355e66..13146a4fb 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ElectMasterRequestHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ElectMasterRequestHeader.java
@@ -17,11 +17,20 @@
package org.apache.rocketmq.common.protocol.header.namesrv.controller;
import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class ElectMasterRequestHeader implements CommandCustomHeader {
+
+ @CFNotNull
+ private String clusterName;
+
+ @CFNotNull
private String brokerName;
+ @CFNotNull
+ private String brokerAddress;
+
public ElectMasterRequestHeader() {
}
@@ -29,6 +38,12 @@ public class ElectMasterRequestHeader implements
CommandCustomHeader {
this.brokerName = brokerName;
}
+ public ElectMasterRequestHeader(String clusterName, String brokerName,
String brokerAddress) {
+ this.clusterName = clusterName;
+ this.brokerName = brokerName;
+ this.brokerAddress = brokerAddress;
+ }
+
public String getBrokerName() {
return brokerName;
}
@@ -37,10 +52,28 @@ public class ElectMasterRequestHeader implements
CommandCustomHeader {
this.brokerName = brokerName;
}
+ public String getBrokerAddress() {
+ return brokerAddress;
+ }
+
+ public void setBrokerAddress(String brokerAddress) {
+ this.brokerAddress = brokerAddress;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
@Override
public String toString() {
return "ElectMasterRequestHeader{" +
+ "clusterName='" + clusterName + '\'' +
"brokerName='" + brokerName + '\'' +
+ "brokerAddress='" + brokerAddress + '\'' +
'}';
}
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index a53c7ca9d..c63ca0bf1 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Set;
import java.util.function.BiPredicate;
import java.util.function.Predicate;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -67,8 +68,8 @@ public class ReplicasInfoManager {
}
public ControllerResult<AlterSyncStateSetResponseHeader> alterSyncStateSet(
- final AlterSyncStateSetRequestHeader request, final SyncStateSet
syncStateSet,
- final BiPredicate<String, String> brokerAlivePredicate) {
+ final AlterSyncStateSetRequestHeader request, final SyncStateSet
syncStateSet,
+ final BiPredicate<String, String> brokerAlivePredicate) {
final String brokerName = request.getBrokerName();
final ControllerResult<AlterSyncStateSetResponseHeader> result = new
ControllerResult<>(new AlterSyncStateSetResponseHeader());
final AlterSyncStateSetResponseHeader response = result.getResponse();
@@ -90,7 +91,7 @@ public class ReplicasInfoManager {
// Check master
if
(!syncStateInfo.getMasterAddress().equals(request.getMasterAddress())) {
String err = String.format("Rejecting alter syncStateSet
request because the current leader is:{%s}, not {%s}",
- syncStateInfo.getMasterAddress(),
request.getMasterAddress());
+ syncStateInfo.getMasterAddress(),
request.getMasterAddress());
log.error("{}", err);
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_MASTER, err);
return result;
@@ -99,7 +100,7 @@ public class ReplicasInfoManager {
// Check master epoch
if (request.getMasterEpoch() != syncStateInfo.getMasterEpoch()) {
String err = String.format("Rejecting alter syncStateSet
request because the current master epoch is:{%d}, not {%d}",
- syncStateInfo.getMasterEpoch(), request.getMasterEpoch());
+ syncStateInfo.getMasterEpoch(),
request.getMasterEpoch());
log.error("{}", err);
result.setCodeAndRemark(ResponseCode.CONTROLLER_FENCED_MASTER_EPOCH, err);
return result;
@@ -108,7 +109,7 @@ public class ReplicasInfoManager {
// Check syncStateSet epoch
if (syncStateSet.getSyncStateSetEpoch() !=
syncStateInfo.getSyncStateSetEpoch()) {
String err = String.format("Rejecting alter syncStateSet
request because the current syncStateSet epoch is:{%d}, not {%d}",
- syncStateInfo.getSyncStateSetEpoch(),
syncStateSet.getSyncStateSetEpoch());
+ syncStateInfo.getSyncStateSetEpoch(),
syncStateSet.getSyncStateSetEpoch());
log.error("{}", err);
result.setCodeAndRemark(ResponseCode.CONTROLLER_FENCED_SYNC_STATE_SET_EPOCH,
err);
return result;
@@ -150,8 +151,9 @@ public class ReplicasInfoManager {
}
public ControllerResult<ElectMasterResponseHeader> electMaster(
- final ElectMasterRequestHeader request, final BiPredicate<String,
String> brokerAlivePredicate) {
+ final ElectMasterRequestHeader request, final BiPredicate<String,
String> brokerAlivePredicate) {
final String brokerName = request.getBrokerName();
+ final String assignBrokerAddress = request.getBrokerAddress();
final ControllerResult<ElectMasterResponseHeader> result = new
ControllerResult<>(new ElectMasterResponseHeader());
if (isContainsBroker(brokerName)) {
final SyncStateInfo syncStateInfo =
this.syncStateSetInfoTable.get(brokerName);
@@ -160,16 +162,26 @@ public class ReplicasInfoManager {
// First, check whether the master is still active
final String oldMaster = syncStateInfo.getMasterAddress();
if (StringUtils.isNoneEmpty(oldMaster) &&
brokerAlivePredicate.test(brokerInfo.getClusterName(), oldMaster)) {
- String err = String.format("The old master %s is still alive,
not need to elect new master for broker %s", oldMaster,
brokerInfo.getBrokerName());
- log.warn("{}", err);
-
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REQUEST, err);
- return result;
+
+ if (StringUtils.isBlank(assignBrokerAddress)) {
+ String err = String.format("The old master %s is still
alive, no need to elect new master for broker %s", oldMaster,
brokerInfo.getBrokerName());
+ log.warn("{}", err);
+
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REQUEST, err);
+ return result;
+ }
+
+ if (StringUtils.equals(oldMaster, assignBrokerAddress)) {
+ String err = String.format("The Re-elect master is the
same as the old master %s which is still alive, no need to elect new master for
broker %s", oldMaster, brokerInfo.getBrokerName());
+ log.warn("{}", err);
+
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REQUEST, err);
+ return result;
+ }
}
// Try elect a master in syncStateSet
if (syncStateSet.size() > 1) {
- boolean electSuccess = tryElectMaster(result, brokerName,
syncStateSet, candidate ->
- !candidate.equals(syncStateInfo.getMasterAddress()) &&
brokerAlivePredicate.test(brokerInfo.getClusterName(), candidate));
+ boolean electSuccess = tryElectMaster(result, brokerName,
assignBrokerAddress, syncStateSet, candidate ->
+ !candidate.equals(syncStateInfo.getMasterAddress()) &&
brokerAlivePredicate.test(brokerInfo.getClusterName(), candidate));
if (electSuccess) {
return result;
}
@@ -177,8 +189,8 @@ public class ReplicasInfoManager {
// Try elect a master in lagging replicas if
enableElectUncleanMaster = true
if (controllerConfig.isEnableElectUncleanMaster()) {
- boolean electSuccess = tryElectMaster(result, brokerName,
brokerInfo.getAllBroker(), candidate ->
- !candidate.equals(syncStateInfo.getMasterAddress()) &&
brokerAlivePredicate.test(brokerInfo.getClusterName(), candidate));
+ boolean electSuccess = tryElectMaster(result, brokerName,
assignBrokerAddress, brokerInfo.getAllBroker(), candidate ->
+ !candidate.equals(syncStateInfo.getMasterAddress()) &&
brokerAlivePredicate.test(brokerInfo.getClusterName(), candidate));
if (electSuccess) {
return result;
}
@@ -202,9 +214,28 @@ public class ReplicasInfoManager {
* @return true if elect success
*/
private boolean tryElectMaster(final
ControllerResult<ElectMasterResponseHeader> result, final String brokerName,
- final Set<String> candidates, final Predicate<String> filter) {
+ final String assignBrokerAddress, final
Set<String> candidates, final Predicate<String> filter) {
final int masterEpoch =
this.syncStateSetInfoTable.get(brokerName).getMasterEpoch();
final int syncStateSetEpoch =
this.syncStateSetInfoTable.get(brokerName).getSyncStateSetEpoch();
+
+ //Re-elect the assigned broker as master
+ if (StringUtils.isNotBlank(assignBrokerAddress) &&
filter.test(assignBrokerAddress)) {
+ final ElectMasterResponseHeader response = result.getResponse();
+ response.setNewMasterAddress(assignBrokerAddress);
+ response.setMasterEpoch(masterEpoch + 1);
+ response.setSyncStateSetEpoch(syncStateSetEpoch);
+ BrokerMemberGroup brokerMemberGroup =
buildBrokerMemberGroup(brokerName);
+ if (null != brokerMemberGroup) {
+ response.setBrokerMemberGroup(brokerMemberGroup);
+ result.setBody(brokerMemberGroup.encode());
+ }
+ final ElectMasterEvent event = new ElectMasterEvent(brokerName,
assignBrokerAddress);
+ result.addEvent(event);
+ return true;
+ } else if (StringUtils.isNotBlank(assignBrokerAddress) &&
!filter.test(assignBrokerAddress)) {
+ return false;
+ }
+
for (final String candidate : candidates) {
if (filter.test(candidate)) {
final ElectMasterResponseHeader response =
result.getResponse();
@@ -213,6 +244,7 @@ public class ReplicasInfoManager {
response.setSyncStateSetEpoch(syncStateSetEpoch);
BrokerMemberGroup brokerMemberGroup =
buildBrokerMemberGroup(brokerName);
if (null != brokerMemberGroup) {
+ response.setBrokerMemberGroup(brokerMemberGroup);
result.setBody(brokerMemberGroup.encode());
}
final ElectMasterEvent event = new
ElectMasterEvent(brokerName, candidate);
@@ -237,7 +269,7 @@ public class ReplicasInfoManager {
}
public ControllerResult<RegisterBrokerToControllerResponseHeader>
registerBroker(
- final RegisterBrokerToControllerRequestHeader request) {
+ final RegisterBrokerToControllerRequestHeader request) {
final String brokerName = request.getBrokerName();
final String brokerAddress = request.getBrokerAddress();
final ControllerResult<RegisterBrokerToControllerResponseHeader>
result = new ControllerResult<>(new RegisterBrokerToControllerResponseHeader());
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
index 12824d7c3..5b583b493 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
@@ -17,21 +17,25 @@
package org.apache.rocketmq.controller.processor;
import io.netty.channel.ChannelHandlerContext;
+
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.SyncStateSet;
import
org.apache.rocketmq.common.protocol.header.namesrv.BrokerHeartbeatRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
-import
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerRequestHeader;
-import
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerResponseHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerRequestHeader;
+import
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerResponseHeader;
import org.apache.rocketmq.controller.BrokerHeartbeatManager;
import org.apache.rocketmq.controller.ControllerManager;
import org.apache.rocketmq.logging.InternalLogger;
@@ -69,9 +73,9 @@ public class ControllerRequestProcessor implements
NettyRequestProcessor {
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws Exception {
if (ctx != null) {
log.debug("Receive request, {} {} {}",
- request.getCode(),
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
- request);
+ request.getCode(),
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+ request);
}
switch (request.getCode()) {
case CONTROLLER_ALTER_SYNC_STATE_SET: {
@@ -84,10 +88,21 @@ public class ControllerRequestProcessor implements
NettyRequestProcessor {
break;
}
case CONTROLLER_ELECT_MASTER: {
- final ElectMasterRequestHeader controllerRequest =
(ElectMasterRequestHeader)
request.decodeCommandCustomHeader(ElectMasterRequestHeader.class);
- final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().electMaster(controllerRequest);
+ final ElectMasterRequestHeader electMasterRequest =
(ElectMasterRequestHeader)
request.decodeCommandCustomHeader(ElectMasterRequestHeader.class);
+ final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().electMaster(electMasterRequest);
if (future != null) {
- return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+ final RemotingCommand response =
future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+ final ElectMasterResponseHeader responseHeader =
(ElectMasterResponseHeader) response.readCustomHeader();
+
+ if (null != responseHeader) {
+ if
(StringUtils.isNotEmpty(responseHeader.getNewMasterAddress())) {
+
heartbeatManager.changeBrokerMetadata(electMasterRequest.getClusterName(),
responseHeader.getNewMasterAddress(), MixAll.MASTER_ID);
+ }
+ if
(this.controllerManager.getControllerConfig().isNotifyBrokerRoleChanged()) {
+
this.controllerManager.notifyBrokerRoleChanged(responseHeader,
electMasterRequest.getClusterName());
+ }
+ }
+ return response;
}
break;
}
@@ -99,7 +114,7 @@ public class ControllerRequestProcessor implements
NettyRequestProcessor {
final RegisterBrokerToControllerResponseHeader
responseHeader = (RegisterBrokerToControllerResponseHeader)
response.readCustomHeader();
if (responseHeader != null && responseHeader.getBrokerId()
>= 0) {
this.heartbeatManager.registerBroker(controllerRequest.getClusterName(),
controllerRequest.getBrokerName(), controllerRequest.getBrokerAddress(),
- responseHeader.getBrokerId(),
controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel());
+ responseHeader.getBrokerId(),
controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel());
}
return response;
}
diff --git
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
index 09ed9fc70..ed47be2df 100644
---
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
+++
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
@@ -20,6 +20,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.common.ControllerConfig;
+import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.SyncStateSet;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetResponseHeader;
@@ -53,7 +54,8 @@ public class ReplicasInfoManagerTest {
this.replicasInfoManager = new ReplicasInfoManager(config);
}
- public boolean registerNewBroker(String clusterName, String brokerName,
String brokerAddress, boolean isFirstRegisteredBroker) {
+ public boolean registerNewBroker(String clusterName, String brokerName,
String brokerAddress,
+ boolean isFirstRegisteredBroker) {
// Register new broker
final RegisterBrokerToControllerRequestHeader registerRequest =
new RegisterBrokerToControllerRequestHeader(clusterName,
brokerName, brokerAddress);
@@ -72,7 +74,8 @@ public class ReplicasInfoManagerTest {
return true;
}
- private boolean alterNewInSyncSet(String brokerName, String masterAddress,
int masterEpoch, Set<String> newSyncStateSet, int syncStateSetEpoch) {
+ private boolean alterNewInSyncSet(String brokerName, String masterAddress,
int masterEpoch,
+ Set<String> newSyncStateSet, int syncStateSetEpoch) {
final AlterSyncStateSetRequestHeader alterRequest =
new AlterSyncStateSetRequestHeader(brokerName, masterAddress,
masterEpoch);
final ControllerResult<AlterSyncStateSetResponseHeader> result =
this.replicasInfoManager.alterSyncStateSet(alterRequest, new
SyncStateSet(newSyncStateSet, syncStateSetEpoch), (va1, va2) -> true);
@@ -113,6 +116,29 @@ public class ReplicasInfoManagerTest {
assertEquals(response.getMasterEpoch(), 2);
assertFalse(response.getNewMasterAddress().isEmpty());
assertNotEquals(response.getNewMasterAddress(), "127.0.0.1:9000");
+
+ final Set<String> brokerSet = new HashSet<>();
+ brokerSet.add("127.0.0.1:9000");
+ brokerSet.add("127.0.0.1:9001");
+ brokerSet.add("127.0.0.1:9002");
+ final ElectMasterRequestHeader assignRequest = new
ElectMasterRequestHeader("cluster1","broker1", "127.0.0.1:9000");
+ final ControllerResult<ElectMasterResponseHeader> cResult1 =
this.replicasInfoManager.electMaster(assignRequest, (clusterName,
brokerAddress) -> brokerAddress.contains("127.0.0.1:9000"));
+ assertEquals( cResult1.getResponseCode(),
ResponseCode.CONTROLLER_INVALID_REQUEST);
+
+
+ final ElectMasterRequestHeader assignRequest1 = new
ElectMasterRequestHeader("cluster1","broker1", "127.0.0.1:9001");
+ final ControllerResult<ElectMasterResponseHeader> cResult2 =
this.replicasInfoManager.electMaster(assignRequest1, (clusterName,
brokerAddress) -> brokerAddress.equals("127.0.0.1:9000"));
+ assertEquals( cResult2.getResponseCode(),
ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE);
+
+ final ElectMasterRequestHeader assignRequest2 = new
ElectMasterRequestHeader("cluster1","broker1", "127.0.0.1:9001");
+ final ControllerResult<ElectMasterResponseHeader> cResult3 =
this.replicasInfoManager.electMaster(assignRequest2, (clusterName,
brokerAddress) -> !brokerAddress.equals("127.0.0.1:9000"));
+ assertEquals( cResult3.getResponseCode(), ResponseCode.SUCCESS);
+ final ElectMasterResponseHeader response3 = cResult3.getResponse();
+ assertEquals(response3.getNewMasterAddress(),"127.0.0.1:9001");
+ assertEquals(response.getMasterEpoch(), 2);
+ assertFalse(response.getNewMasterAddress().isEmpty());
+ assertNotEquals(response.getNewMasterAddress(), "127.0.0.1:9000");
+
}
@Test
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 3356f94f7..0b550b56b 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -53,6 +53,7 @@ import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
+import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
@@ -208,7 +209,8 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
@Override
public void updateGlobalWhiteAddrConfig(String addr,
- String globalWhiteAddrs, String aclFileFullPath) throws
RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ String globalWhiteAddrs,
+ String aclFileFullPath) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
defaultMQAdminExtImpl.updateGlobalWhiteAddrConfig(addr,
globalWhiteAddrs, aclFileFullPath);
}
@@ -333,7 +335,8 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
}
@Override
- public ProducerTableInfo getAllProducerInfo(final String brokerAddr)
throws RemotingException, MQClientException, InterruptedException,
MQBrokerException {
+ public ProducerTableInfo getAllProducerInfo(
+ final String brokerAddr) throws RemotingException, MQClientException,
InterruptedException, MQBrokerException {
return defaultMQAdminExtImpl.getAllProducerInfo(brokerAddr);
}
@@ -534,7 +537,8 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
}
@Override
- public boolean deleteExpiredCommitLogByAddr(String addr) throws
RemotingConnectException, RemotingSendRequestException,
+ public boolean deleteExpiredCommitLogByAddr(
+ String addr) throws RemotingConnectException,
RemotingSendRequestException,
RemotingTimeoutException, MQClientException, InterruptedException {
return defaultMQAdminExtImpl.deleteExpiredCommitLogByAddr(addr);
}
@@ -787,7 +791,6 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
this.defaultMQAdminExtImpl.resetMasterFlushOffset(brokerAddr,
masterFlushOffset);
}
-
public QueryResult queryMessageByUniqKey(String topic, String key, int
maxNum, long begin, long end)
throws MQClientException, InterruptedException {
@@ -808,7 +811,8 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
}
@Override
- public Map<String, Properties> getControllerConfig(List<String>
controllerServers) throws InterruptedException, RemotingTimeoutException,
+ public Map<String, Properties> getControllerConfig(
+ List<String> controllerServers) throws InterruptedException,
RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException,
MQClientException,
UnsupportedEncodingException {
return
this.defaultMQAdminExtImpl.getControllerConfig(controllerServers);
@@ -819,4 +823,10 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
List<String> controllers) throws InterruptedException,
RemotingConnectException, UnsupportedEncodingException,
RemotingSendRequestException, RemotingTimeoutException, MQClientException,
MQBrokerException {
this.defaultMQAdminExtImpl.updateControllerConfig(properties,
controllers);
}
+
+ @Override
+ public ElectMasterResponseHeader electMaster(String controllerAddr, String
clusterName,
+ String brokerName, String brokerAddr) throws RemotingException,
InterruptedException, MQBrokerException {
+ return this.defaultMQAdminExtImpl.electMaster(controllerAddr,
clusterName, brokerName, brokerAddr);
+ }
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index ee28c876e..3cea455a2 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -60,6 +60,7 @@ import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
+import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageConst;
@@ -1793,6 +1794,12 @@ public class DefaultMQAdminExtImpl implements
MQAdminExt, MQAdminExtInner {
this.mqClientInstance.getMQClientAPIImpl().resetMasterFlushOffset(brokerAddr,
masterFlushOffset);
}
+ @Override
+ public ElectMasterResponseHeader electMaster(String controllerAddr, String
clusterName,
+ String brokerName, String brokerAddr) throws RemotingException,
InterruptedException, MQBrokerException {
+ return
this.mqClientInstance.getMQClientAPIImpl().electMaster(controllerAddr,
clusterName, brokerName, brokerAddr);
+ }
+
@Override
public GroupForbidden updateAndGetGroupReadForbidden(String brokerAddr,
String groupName, String topicName,
Boolean readable) throws RemotingException, InterruptedException,
MQBrokerException {
@@ -1823,7 +1830,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
return
this.mqClientInstance.getMQClientAPIImpl().getControllerConfig(controllerServers,
timeoutMillis);
}
- @Override public void updateControllerConfig(Properties properties,
+ @Override
+ public void updateControllerConfig(Properties properties,
List<String> controllers) throws InterruptedException,
RemotingConnectException, UnsupportedEncodingException,
RemotingSendRequestException, RemotingTimeoutException,
MQClientException, MQBrokerException {
this.mqClientInstance.getMQClientAPIImpl().updateControllerConfig(properties,
controllers, timeoutMillis);
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 9fe6ea4fe..857f159b5 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -52,6 +52,7 @@ import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
+import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
@@ -99,7 +100,8 @@ public interface MQAdminExt extends MQAdmin {
final String globalWhiteAddrs) throws RemotingException,
MQBrokerException,
InterruptedException, MQClientException;
- void updateGlobalWhiteAddrConfig(final String addr, final String
globalWhiteAddrs, String aclFileFullPath) throws RemotingException,
MQBrokerException,
+ void updateGlobalWhiteAddrConfig(final String addr, final String
globalWhiteAddrs,
+ String aclFileFullPath) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
ClusterAclVersionInfo examineBrokerClusterAclVersionInfo(
@@ -113,14 +115,16 @@ public interface MQAdminExt extends MQAdmin {
final SubscriptionGroupConfig config) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException;
- SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr,
final String group) throws InterruptedException, RemotingException,
MQClientException, MQBrokerException;
+ SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr,
+ final String group) throws InterruptedException, RemotingException,
MQClientException, MQBrokerException;
TopicStatsTable examineTopicStats(
final String topic) throws RemotingException, MQClientException,
InterruptedException,
MQBrokerException;
- TopicStatsTable examineTopicStats(String brokerAddr, final String topic)
throws RemotingException, MQClientException, InterruptedException,
- MQBrokerException;
+ TopicStatsTable examineTopicStats(String brokerAddr,
+ final String topic) throws RemotingException, MQClientException,
InterruptedException,
+ MQBrokerException;
AdminToolResult<TopicStatsTable> examineTopicStatsConcurrent(String topic);
@@ -162,7 +166,7 @@ public interface MQAdminExt extends MQAdmin {
MQClientException, InterruptedException, MQBrokerException;
ProducerTableInfo getAllProducerInfo(final String brokerAddr) throws
RemotingException,
- MQClientException, InterruptedException, MQBrokerException;
+ MQClientException, InterruptedException, MQBrokerException;
List<String> getNameServerAddressList();
@@ -170,7 +174,7 @@ public interface MQAdminExt extends MQAdmin {
RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQClientException;
int addWritePermOfBroker(final String namesrvAddr, String brokerName)
throws RemotingCommandException,
- RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQClientException;
+ RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQClientException;
void putKVConfig(final String namespace, final String key, final String
value);
@@ -200,7 +204,8 @@ public interface MQAdminExt extends MQAdmin {
void deleteSubscriptionGroup(final String addr, String groupName) throws
RemotingException, MQBrokerException,
InterruptedException, MQClientException;
- void deleteSubscriptionGroup(final String addr, String groupName, boolean
removeOffset) throws RemotingException, MQBrokerException,
+ void deleteSubscriptionGroup(final String addr, String groupName,
+ boolean removeOffset) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
void createAndUpdateKvConfig(String namespace, String key,
@@ -233,11 +238,13 @@ public interface MQAdminExt extends MQAdmin {
GroupList queryTopicConsumeByWho(final String topic) throws
RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQBrokerException,
RemotingException, MQClientException;
- TopicList queryTopicsByConsumer(final String group) throws
InterruptedException, MQBrokerException, RemotingException, MQClientException;
+ TopicList queryTopicsByConsumer(
+ final String group) throws InterruptedException, MQBrokerException,
RemotingException, MQClientException;
AdminToolResult<TopicList> queryTopicsByConsumerConcurrent(final String
group);
- SubscriptionData querySubscription(final String group, final String topic)
throws InterruptedException, MQBrokerException, RemotingException,
MQClientException;
+ SubscriptionData querySubscription(final String group,
+ final String topic) throws InterruptedException, MQBrokerException,
RemotingException, MQClientException;
List<QueueTimeSpan> queryConsumeTimeSpan(final String topic,
final String group) throws InterruptedException, MQBrokerException,
@@ -266,7 +273,8 @@ public interface MQAdminExt extends MQAdmin {
ConsumerRunningInfo getConsumerRunningInfo(final String consumerGroup,
final String clientId, final boolean jstack)
throws RemotingException, MQClientException, InterruptedException;
- ConsumerRunningInfo getConsumerRunningInfo(final String consumerGroup,
final String clientId, final boolean jstack, final boolean metrics)
+ ConsumerRunningInfo getConsumerRunningInfo(final String consumerGroup,
final String clientId, final boolean jstack,
+ final boolean metrics)
throws RemotingException, MQClientException, InterruptedException;
ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup,
@@ -311,7 +319,6 @@ public interface MQAdminExt extends MQAdmin {
long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException;
-
TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr,
long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException;
@@ -368,8 +375,8 @@ public interface MQAdminExt extends MQAdmin {
boolean resumeCheckHalfMessage(final String topic,
final String msgId) throws RemotingException, MQClientException,
InterruptedException, MQBrokerException;
- void setMessageRequestMode(final String brokerAddr, final String topic,
final String consumerGroup, final
- MessageRequestMode mode, final int popWorkGroupSize, final long
timeoutMillis)
+ void setMessageRequestMode(final String brokerAddr, final String topic,
final String consumerGroup,
+ final MessageRequestMode mode, final int popWorkGroupSize, final long
timeoutMillis)
throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException,
RemotingConnectException, MQClientException;
@@ -383,15 +390,16 @@ public interface MQAdminExt extends MQAdmin {
throws RemotingException, InterruptedException, MQBrokerException;
TopicConfig examineTopicConfig(final String addr,
- final String topic) throws
InterruptedException, MQBrokerException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException;
+ final String topic) throws InterruptedException, MQBrokerException,
RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException;
- void createStaticTopic(final String addr, final String defaultTopic, final
TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final
boolean force) throws RemotingException, InterruptedException,
MQBrokerException;
+ void createStaticTopic(final String addr, final String defaultTopic, final
TopicConfig topicConfig,
+ final TopicQueueMappingDetail mappingDetail,
+ final boolean force) throws RemotingException, InterruptedException,
MQBrokerException;
GroupForbidden updateAndGetGroupReadForbidden(String brokerAddr, String
groupName, String topicName,
Boolean readable)
throws RemotingException, InterruptedException, MQBrokerException;
-
MessageExt queryMessage(String clusterName,
String topic,
String msgId) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
@@ -399,21 +407,24 @@ public interface MQAdminExt extends MQAdmin {
HARuntimeInfo getBrokerHAStatus(String brokerAddr) throws
RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQBrokerException;
- InSyncStateData getInSyncStateData(String controllerAddress, List<String>
brokers) throws RemotingException, InterruptedException, MQBrokerException;
+ InSyncStateData getInSyncStateData(String controllerAddress,
+ List<String> brokers) throws RemotingException, InterruptedException,
MQBrokerException;
- EpochEntryCache getBrokerEpochCache(String brokerAddr) throws
RemotingException, InterruptedException, MQBrokerException;
+ EpochEntryCache getBrokerEpochCache(
+ String brokerAddr) throws RemotingException, InterruptedException,
MQBrokerException;
+
+ GetMetaDataResponseHeader getControllerMetaData(
+ String controllerAddr) throws RemotingException, InterruptedException,
MQBrokerException;
- GetMetaDataResponseHeader getControllerMetaData(String controllerAddr)
throws RemotingException, InterruptedException, MQBrokerException;
/**
* Reset master flush offset in slave
*
- * @param brokerAddr slave broker address
+ * @param brokerAddr slave broker address
* @param masterFlushOffset master flush offset
*/
void resetMasterFlushOffset(String brokerAddr, long masterFlushOffset)
throws InterruptedException, MQBrokerException,
RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException;
-
/**
* Get controller config.
* <br>
@@ -421,7 +432,8 @@ public interface MQAdminExt extends MQAdmin {
*
* @return The fetched controller config
*/
- Map<String, Properties> getControllerConfig(List<String>
controllerServers) throws InterruptedException, RemotingTimeoutException,
+ Map<String, Properties> getControllerConfig(
+ List<String> controllerServers) throws InterruptedException,
RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException,
MQClientException, UnsupportedEncodingException;
/**
@@ -433,4 +445,18 @@ public interface MQAdminExt extends MQAdmin {
final List<String> controllers) throws InterruptedException,
RemotingConnectException,
UnsupportedEncodingException, RemotingSendRequestException,
RemotingTimeoutException, MQClientException, MQBrokerException;
+ /**
+ * manual trigger broker elect master
+ *
+ * @param controllerAddr controller address
+ * @param brokerName broker name
+ * @param brokerAddr broker address
+ * @return
+ * @throws RemotingException
+ * @throws InterruptedException
+ * @throws MQBrokerException
+ */
+ ElectMasterResponseHeader electMaster(String controllerAddr, String
clusterName, String brokerName,
+ String brokerAddr)
+ throws RemotingException, InterruptedException, MQBrokerException;
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index fc9226d8a..2673c6747 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -63,6 +63,7 @@ import
org.apache.rocketmq.tools.command.container.RemoveBrokerSubCommand;
import
org.apache.rocketmq.tools.command.controller.GetControllerConfigSubCommand;
import
org.apache.rocketmq.tools.command.controller.GetControllerMetaDataSubCommand;
import
org.apache.rocketmq.tools.command.controller.UpdateControllerConfigSubCommand;
+import org.apache.rocketmq.tools.command.controller.ReElectMasterSubCommand;
import org.apache.rocketmq.tools.command.export.ExportConfigsCommand;
import org.apache.rocketmq.tools.command.export.ExportMetadataCommand;
import org.apache.rocketmq.tools.command.export.ExportMetricsCommand;
@@ -264,6 +265,7 @@ public class MQAdminStartup {
initCommand(new GetControllerConfigSubCommand());
initCommand(new UpdateControllerConfigSubCommand());
+ initCommand(new ReElectMasterSubCommand());
}
private static void initLogback() throws JoranException {
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java
new file mode 100644
index 000000000..47e8dfadc
--- /dev/null
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.controller;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.protocol.body.BrokerMemberGroup;
+import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class ReElectMasterSubCommand implements SubCommand {
+
+ @Override
+ public String commandName() {
+ return "electMaster";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Re-elect the specified broker as master";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("a", "controllerAddress", true, "The address
of controller");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("b", "brokerAddress", true, "The address of the
broker which requires to become master");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("n", "brokerName", true, "The broker name of the
replicas that require to be manipulated");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("c", "clusterName", true, "the clusterName of
broker");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook
rpcHook) throws SubCommandException {
+
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+ String controllerAddress = commandLine.getOptionValue("a").trim();
+ String clusterName = commandLine.getOptionValue('c').trim();
+ String brokerName = commandLine.getOptionValue('n').trim();
+ String brokerAddress = commandLine.getOptionValue("b").trim();
+
+ try {
+ defaultMQAdminExt.start();
+ final ElectMasterResponseHeader metaData =
defaultMQAdminExt.electMaster(controllerAddress, clusterName, brokerName,
brokerAddress);
+ System.out.printf("\n#ClusterName\t%s", clusterName);
+ System.out.printf("\n#BrokerName\t%s", brokerName);
+ System.out.printf("\n#BrokerMasterAddr\t%s",
metaData.getNewMasterAddress());
+ System.out.printf("\n#MasterEpoch\t%s", metaData.getMasterEpoch());
+ System.out.printf("\n#SyncStateSetEpoch\t%s\n",
metaData.getSyncStateSetEpoch());
+ BrokerMemberGroup brokerMemberGroup =
metaData.getBrokerMemberGroup();
+ if (null != brokerMemberGroup && null !=
brokerMemberGroup.getBrokerAddrs()) {
+ brokerMemberGroup.getBrokerAddrs().forEach((key, value) ->
System.out.printf("\t#Broker\t%d\t%s\n", key, value));
+ }
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + "
command failed", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+
+ }
+}