This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new a961d1b44 [ISSUE #4817] Add a command to clear broker data from
controller for CLI (#4823)
a961d1b44 is described below
commit a961d1b44a23af4abf0783931d362c071037135b
Author: mxsm <[email protected]>
AuthorDate: Sat Aug 27 19:42:55 2022 +0800
[ISSUE #4817] Add a command to clear broker data from controller for CLI
(#4823)
---
.../rocketmq/client/impl/MQClientAPIImpl.java | 33 ++++++-
.../rocketmq/common/protocol/RequestCode.java | 7 ++
.../rocketmq/common/protocol/ResponseCode.java | 2 +
.../CleanControllerBrokerDataRequestHeader.java | 99 ++++++++++++++++++++
.../org/apache/rocketmq/controller/Controller.java | 9 +-
.../rocketmq/controller/ControllerManager.java | 1 +
.../controller/impl/DLedgerController.java | 28 +++---
.../impl/DefaultBrokerHeartbeatManager.java | 2 +-
.../impl/event/CleanBrokerDataEvent.java | 64 +++++++++++++
.../controller/impl/event/EventSerializer.java | 2 +
.../rocketmq/controller/impl/event/EventType.java | 5 +-
.../controller/impl/manager/BrokerInfo.java | 4 +
.../impl/manager/ReplicasInfoManager.java | 81 +++++++++++++++-
.../controller/impl/manager/SyncStateInfo.java | 4 +
.../processor/ControllerRequestProcessor.java | 18 +++-
.../impl/manager/ReplicasInfoManagerTest.java | 75 ++++++++++-----
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 6 ++
.../tools/admin/DefaultMQAdminExtImpl.java | 15 ++-
.../apache/rocketmq/tools/admin/MQAdminExt.java | 12 ++-
.../rocketmq/tools/command/MQAdminStartup.java | 10 +-
.../CleanControllerBrokerDataSubCommand.java | 103 +++++++++++++++++++++
21 files changed, 522 insertions(+), 58 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 7de364bcb..aa2280591 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.impl;
+import com.alibaba.fastjson.JSON;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -96,9 +97,9 @@ import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
import org.apache.rocketmq.common.protocol.body.QueryAssignmentRequestBody;
import org.apache.rocketmq.common.protocol.body.QueryAssignmentResponseBody;
-import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody;
import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody;
@@ -185,6 +186,7 @@ import
org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHea
import
org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
+import
org.apache.rocketmq.common.protocol.header.namesrv.controller.CleanControllerBrokerDataRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
@@ -193,10 +195,10 @@ import
org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.rpchook.DynamicalExtFieldRPCHook;
+import org.apache.rocketmq.common.rpchook.StreamTypeRPCHook;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.subscription.GroupForbidden;
-import org.apache.rocketmq.common.rpchook.StreamTypeRPCHook;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
@@ -217,8 +219,8 @@ import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
import static
org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
-import com.alibaba.fastjson.JSON;
public class MQClientAPIImpl implements NameServerUpdateCallback {
private final static InternalLogger log = ClientLogger.getLog();
@@ -3058,4 +3060,29 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
+
+ public void cleanControllerBrokerData(String controllerAddr, String
clusterName,
+ String brokerName, String brokerAddr, boolean isCleanLivingBroker)
+ throws RemotingException, InterruptedException, MQBrokerException {
+
+ //get controller leader address
+ final GetMetaDataResponseHeader controllerMetaData =
this.getControllerMetaData(controllerAddr);
+ assert controllerMetaData != null;
+ assert controllerMetaData.getControllerLeaderAddress() != null;
+ final String leaderAddress =
controllerMetaData.getControllerLeaderAddress();
+
+ CleanControllerBrokerDataRequestHeader cleanHeader = new
CleanControllerBrokerDataRequestHeader(clusterName, brokerName, brokerAddr,
isCleanLivingBroker);
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CLEAN_BROKER_DATA,
cleanHeader);
+
+ final RemotingCommand response =
this.remotingClient.invokeSync(leaderAddress, request, 3000);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return;
+ }
+ default:
+ break;
+ }
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 9441e4fd0..1d3393b57 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -269,4 +269,11 @@ public class RequestCode {
* get config from controller
*/
public static final int GET_CONTROLLER_CONFIG = 1010;
+
+ /**
+ * clean broker data
+ */
+ public static final int CLEAN_BROKER_DATA = 1011;
+
+
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
index f68bac279..ac8a286ee 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
@@ -111,4 +111,6 @@ public class ResponseCode extends RemotingSysResponseCode {
public static final int CONTROLLER_BROKER_METADATA_NOT_EXIST = 2008;
+ public static final int CONTROLLER_INVALID_CLEAN_BROKER_METADATA = 2009;
+
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/CleanControllerBrokerDataRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/CleanControllerBrokerDataRequestHeader.java
new file mode 100644
index 000000000..2600a52f0
--- /dev/null
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/CleanControllerBrokerDataRequestHeader.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header.namesrv.controller;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.annotation.CFNullable;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class CleanControllerBrokerDataRequestHeader implements
CommandCustomHeader {
+
+ @CFNullable
+ private String clusterName;
+
+ @CFNotNull
+ private String brokerName;
+
+ @CFNullable
+ private String brokerAddress;
+
+ private boolean isCleanLivingBroker = false;
+
+ public CleanControllerBrokerDataRequestHeader() {
+ }
+
+ public CleanControllerBrokerDataRequestHeader(String clusterName, String
brokerName, String brokerAddress,
+ boolean isCleanLivingBroker) {
+ this.clusterName = clusterName;
+ this.brokerName = brokerName;
+ this.brokerAddress = brokerAddress;
+ this.isCleanLivingBroker = isCleanLivingBroker;
+ }
+
+ public CleanControllerBrokerDataRequestHeader(String clusterName, String
brokerName, String brokerAddress) {
+ this(clusterName, brokerName, brokerAddress, false);
+ }
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+ public void setBrokerName(String brokerName) {
+ this.brokerName = brokerName;
+ }
+
+ public String getBrokerAddress() {
+ return brokerAddress;
+ }
+
+ public void setBrokerAddress(String brokerAddress) {
+ this.brokerAddress = brokerAddress;
+ }
+
+ public boolean isCleanLivingBroker() {
+ return isCleanLivingBroker;
+ }
+
+ public void setCleanLivingBroker(boolean cleanLivingBroker) {
+ isCleanLivingBroker = cleanLivingBroker;
+ }
+
+ @Override
+ public String toString() {
+ return "CleanControllerBrokerDataRequestHeader{" +
+ "clusterName='" + clusterName + '\'' +
+ "brokerName='" + brokerName + '\'' +
+ "brokerAddress='" + brokerAddress + '\'' +
+ "isCleanLivingBroker='" + isCleanLivingBroker + '\'' +
+ '}';
+ }
+}
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
b/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
index 1cd724307..628469e67 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
@@ -21,9 +21,10 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.protocol.body.SyncStateSet;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
-import
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerRequestHeader;
+import
org.apache.rocketmq.common.protocol.header.namesrv.controller.CleanControllerBrokerDataRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerRequestHeader;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -106,4 +107,10 @@ public interface Controller {
* Get the remotingServer used by the controller, the upper layer will
reuse this remotingServer.
*/
RemotingServer getRemotingServer();
+
+ /**
+ * Clean controller broker data
+ *
+ */
+ CompletableFuture<RemotingCommand> cleanBrokerData(final
CleanControllerBrokerDataRequestHeader requestHeader);
}
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index cd4a60158..f8ae7eb4f 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -189,6 +189,7 @@ public class ControllerManager {
controllerRemotingServer.registerProcessor(RequestCode.BROKER_HEARTBEAT,
controllerRequestProcessor, this.controllerRequestExecutor);
controllerRemotingServer.registerProcessor(RequestCode.UPDATE_CONTROLLER_CONFIG,
controllerRequestProcessor, this.controllerRequestExecutor);
controllerRemotingServer.registerProcessor(RequestCode.GET_CONTROLLER_CONFIG,
controllerRequestProcessor, this.controllerRequestExecutor);
+
controllerRemotingServer.registerProcessor(RequestCode.CLEAN_BROKER_DATA,
controllerRequestProcessor, this.controllerRequestExecutor);
}
public void start() {
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
index c6a8b3345..1e34aad97 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
@@ -37,18 +37,18 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiPredicate;
import java.util.function.Supplier;
-
+import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.SyncStateSet;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
-import
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerRequestHeader;
+import
org.apache.rocketmq.common.protocol.header.namesrv.controller.CleanControllerBrokerDataRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerRequestHeader;
import org.apache.rocketmq.controller.Controller;
import org.apache.rocketmq.controller.elect.ElectPolicy;
import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
@@ -84,7 +84,6 @@ public class DLedgerController implements Controller {
// use for elect a master
private ElectPolicy electPolicy;
-
private AtomicBoolean isScheduling = new AtomicBoolean(false);
public DLedgerController(final ControllerConfig config, final
BiPredicate<String, String> brokerAlivePredicate) {
@@ -92,9 +91,9 @@ public class DLedgerController implements Controller {
}
public DLedgerController(final ControllerConfig controllerConfig,
- final BiPredicate<String, String>
brokerAlivePredicate, final NettyServerConfig nettyServerConfig,
- final NettyClientConfig nettyClientConfig, final
ChannelEventListener channelEventListener,
- final ElectPolicy electPolicy) {
+ final BiPredicate<String, String> brokerAlivePredicate, final
NettyServerConfig nettyServerConfig,
+ final NettyClientConfig nettyClientConfig, final ChannelEventListener
channelEventListener,
+ final ElectPolicy electPolicy) {
this.controllerConfig = controllerConfig;
this.eventSerializer = new EventSerializer();
this.scheduler = new EventScheduler();
@@ -154,7 +153,7 @@ public class DLedgerController implements Controller {
@Override
public CompletableFuture<RemotingCommand>
alterSyncStateSet(AlterSyncStateSetRequestHeader request,
- final
SyncStateSet syncStateSet) {
+ final SyncStateSet syncStateSet) {
return this.scheduler.appendEvent("alterSyncStateSet",
() -> this.replicasInfoManager.alterSyncStateSet(request,
syncStateSet, this.brokerAlivePredicate), true);
}
@@ -193,7 +192,7 @@ public class DLedgerController implements Controller {
sb.append(peer).append(";");
}
return
RemotingCommand.createResponseCommandWithHeader(ResponseCode.SUCCESS, new
GetMetaDataResponseHeader(
- state.getGroup(), state.getLeaderId(), state.getLeaderAddr(),
state.isLeader(), sb.toString()));
+ state.getGroup(), state.getLeaderId(), state.getLeaderAddr(),
state.isLeader(), sb.toString()));
}
@Override
@@ -201,6 +200,13 @@ public class DLedgerController implements Controller {
return this.dLedgerServer.getRemotingServer();
}
+ @Override
+ public CompletableFuture<RemotingCommand> cleanBrokerData(
+ final CleanControllerBrokerDataRequestHeader requestHeader) {
+ return this.scheduler.appendEvent("cleanBrokerData",
+ () -> this.replicasInfoManager.cleanBrokerData(requestHeader,
this.brokerAlivePredicate), true);
+ }
+
/**
* Append the request to dledger, wait the dledger to commit the request.
*/
@@ -288,7 +294,7 @@ public class DLedgerController implements Controller {
}
public <T> CompletableFuture<RemotingCommand> appendEvent(final String
name,
- final
Supplier<ControllerResult<T>> supplier, boolean isWriteEvent) {
+ final Supplier<ControllerResult<T>> supplier, boolean
isWriteEvent) {
if (isStopped() ||
!DLedgerController.this.roleHandler.isLeaderState()) {
final RemotingCommand command =
RemotingCommand.createResponseCommand(ResponseCode.CONTROLLER_NOT_LEADER, "The
controller is not in leader state");
final CompletableFuture<RemotingCommand> future = new
CompletableFuture<>();
@@ -325,7 +331,7 @@ public class DLedgerController implements Controller {
private final boolean isWriteEvent;
ControllerEventHandler(final String name, final
Supplier<ControllerResult<T>> supplier,
- final boolean isWriteEvent) {
+ final boolean isWriteEvent) {
this.name = name;
this.supplier = supplier;
this.future = new CompletableFuture<>();
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
index e56d97c99..c369d8353 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
@@ -30,9 +30,9 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.BrokerAddrInfo;
+import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.controller.BrokerHeartbeatManager;
import org.apache.rocketmq.controller.BrokerLiveInfo;
import org.apache.rocketmq.logging.InternalLogger;
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/CleanBrokerDataEvent.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/CleanBrokerDataEvent.java
new file mode 100644
index 000000000..4678f90c4
--- /dev/null
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/CleanBrokerDataEvent.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.controller.impl.event;
+
+import java.util.Set;
+
+public class CleanBrokerDataEvent implements EventMessage {
+
+ private String brokerName;
+
+ private Set<String> brokerAddressSet;
+
+ public CleanBrokerDataEvent(String brokerName, Set<String>
brokerAddressSet) {
+ this.brokerName = brokerName;
+ this.brokerAddressSet = brokerAddressSet;
+ }
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+ public void setBrokerName(String brokerName) {
+ this.brokerName = brokerName;
+ }
+
+ public Set<String> getBrokerAddressSet() {
+ return brokerAddressSet;
+ }
+
+ public void setBrokerAddressSet(Set<String> brokerAddressSet) {
+ this.brokerAddressSet = brokerAddressSet;
+ }
+
+ /**
+ * Returns the event type of this message
+ */
+ @Override
+ public EventType getEventType() {
+ return EventType.CLEAN_BROKER_DATA_EVENT;
+ }
+
+ @Override
+ public String toString() {
+ return "CleanBrokerDataEvent{" +
+ "brokerName='" + brokerName + '\'' +
+ ", brokerAddressSet=" + brokerAddressSet +
+ '}';
+ }
+}
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventSerializer.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventSerializer.java
index f78f399a5..d49616f2d 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventSerializer.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventSerializer.java
@@ -67,6 +67,8 @@ public class EventSerializer {
return this.serializer.deserialize(data,
ApplyBrokerIdEvent.class);
case ELECT_MASTER_EVENT:
return this.serializer.deserialize(data,
ElectMasterEvent.class);
+ case CLEAN_BROKER_DATA_EVENT:
+ return this.serializer.deserialize(data,
CleanBrokerDataEvent.class);
default:
break;
}
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventType.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventType.java
index f6971cbdf..6f100438e 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventType.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventType.java
@@ -23,7 +23,8 @@ public enum EventType {
ALTER_SYNC_STATE_SET_EVENT("AlterSyncStateSetEvent", (short) 1),
APPLY_BROKER_ID_EVENT("ApplyBrokerIdEvent", (short) 2),
ELECT_MASTER_EVENT("ElectMasterEvent", (short) 3),
- READ_EVENT("ReadEvent", (short) 4);
+ READ_EVENT("ReadEvent", (short) 4),
+ CLEAN_BROKER_DATA_EVENT("CleanBrokerDataEvent", (short) 5);
private final String name;
private final short id;
@@ -43,6 +44,8 @@ public enum EventType {
return ELECT_MASTER_EVENT;
case 4:
return READ_EVENT;
+ case 5:
+ return CLEAN_BROKER_DATA_EVENT;
}
return null;
}
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
index 2c283c08c..d6e203ca6 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
@@ -38,6 +38,10 @@ public class BrokerInfo {
this.brokerIdTable = new HashMap<>();
}
+ public void removeBrokerAddress(final String address) {
+ this.brokerIdTable.remove(address);
+ }
+
public long newBrokerId() {
return this.brokerIdCount.incrementAndGet();
}
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index ca36ae2e3..683a17d4e 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -23,26 +23,29 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiPredicate;
-
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.BrokerMemberGroup;
import org.apache.rocketmq.common.protocol.body.InSyncStateData;
import org.apache.rocketmq.common.protocol.body.SyncStateSet;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetResponseHeader;
-import
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerRequestHeader;
-import
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerResponseHeader;
+import
org.apache.rocketmq.common.protocol.header.namesrv.controller.CleanControllerBrokerDataRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerRequestHeader;
+import
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerResponseHeader;
import org.apache.rocketmq.controller.elect.ElectPolicy;
import org.apache.rocketmq.controller.impl.event.AlterSyncStateSetEvent;
import org.apache.rocketmq.controller.impl.event.ApplyBrokerIdEvent;
+import org.apache.rocketmq.controller.impl.event.CleanBrokerDataEvent;
import org.apache.rocketmq.controller.impl.event.ControllerResult;
import org.apache.rocketmq.controller.impl.event.ElectMasterEvent;
import org.apache.rocketmq.controller.impl.event.EventMessage;
@@ -150,7 +153,8 @@ public class ReplicasInfoManager {
return result;
}
- public ControllerResult<ElectMasterResponseHeader> electMaster(final
ElectMasterRequestHeader request, final ElectPolicy electPolicy) {
+ public ControllerResult<ElectMasterResponseHeader> electMaster(final
ElectMasterRequestHeader request,
+ final ElectPolicy electPolicy) {
final String brokerName = request.getBrokerName();
final String assignBrokerAddress = request.getBrokerAddress();
final ControllerResult<ElectMasterResponseHeader> result = new
ControllerResult<>(new ElectMasterResponseHeader());
@@ -316,6 +320,43 @@ public class ReplicasInfoManager {
return result;
}
+ public ControllerResult<Void> cleanBrokerData(final
CleanControllerBrokerDataRequestHeader requestHeader,
+ final BiPredicate<String, String> brokerAlivePredicate) {
+ final ControllerResult<Void> result = new ControllerResult<>();
+
+ final String clusterName = requestHeader.getClusterName();
+ final String brokerName = requestHeader.getBrokerName();
+ final String brokerAddrs = requestHeader.getBrokerAddress();
+
+ Set<String> brokerAddressSet = null;
+ if (!requestHeader.isCleanLivingBroker()) {
+ //if SyncStateInfo.masterAddress is not empty, at least one broker
with the same BrokerName is alive
+ SyncStateInfo syncStateInfo =
this.syncStateSetInfoTable.get(brokerName);
+ if (StringUtils.isBlank(brokerAddrs) && null != syncStateInfo &&
StringUtils.isNotEmpty(syncStateInfo.getMasterAddress())) {
+ String remark = String.format("Broker %s is still alive, clean
up failure", requestHeader.getBrokerName());
+
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_CLEAN_BROKER_METADATA,
remark);
+ return result;
+ }
+ if (StringUtils.isNotBlank(brokerAddrs)) {
+ brokerAddressSet =
Stream.of(brokerAddrs.split(";")).collect(Collectors.toSet());
+ for (String brokerAddr : brokerAddressSet) {
+ if (brokerAlivePredicate.test(clusterName, brokerAddr)) {
+ String remark = String.format("Broker [%s, %s] is
still alive, clean up failure", requestHeader.getBrokerName(), brokerAddr);
+
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_CLEAN_BROKER_METADATA,
remark);
+ return result;
+ }
+ }
+ }
+ }
+ if (isContainsBroker(brokerName)) {
+ final CleanBrokerDataEvent event = new
CleanBrokerDataEvent(brokerName, brokerAddressSet);
+ result.addEvent(event);
+ return result;
+ }
+
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_CLEAN_BROKER_METADATA,
String.format("Broker %s is not existed,clean broker data failure.",
brokerName));
+ return result;
+ }
+
/**
* Apply events to memory statemachine.
*
@@ -333,6 +374,9 @@ public class ReplicasInfoManager {
case ELECT_MASTER_EVENT:
handleElectMaster((ElectMasterEvent) event);
break;
+ case CLEAN_BROKER_DATA_EVENT:
+ handleCleanBrokerDataEvent((CleanBrokerDataEvent) event);
+ break;
default:
break;
}
@@ -387,6 +431,33 @@ public class ReplicasInfoManager {
}
}
+ private void handleCleanBrokerDataEvent(final CleanBrokerDataEvent event) {
+
+ final String brokerName = event.getBrokerName();
+ final Set<String> brokerAddressSet = event.getBrokerAddressSet();
+
+ if (null == brokerAddressSet || brokerAddressSet.isEmpty()) {
+ this.replicaInfoTable.remove(brokerName);
+ this.syncStateSetInfoTable.remove(brokerName);
+ return;
+ }
+ if (!isContainsBroker(brokerName)) {
+ return;
+ }
+ final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+ final SyncStateInfo syncStateInfo =
this.syncStateSetInfoTable.get(brokerName);
+ for (String brokerAddress : brokerAddressSet) {
+ brokerInfo.removeBrokerAddress(brokerAddress);
+ syncStateInfo.removeSyncState(brokerAddress);
+ }
+ if (brokerInfo.getBrokerIdTable().isEmpty()) {
+ this.replicaInfoTable.remove(brokerName);
+ }
+ if (syncStateInfo.getSyncStateSet().isEmpty()) {
+ this.syncStateSetInfoTable.remove(brokerName);
+ }
+ }
+
/**
* Is the broker existed in the memory metadata
*
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
index 346d4ccdd..997dee3c5 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
@@ -79,4 +79,8 @@ public class SyncStateInfo {
public int getMasterEpoch() {
return masterEpoch;
}
+
+ public void removeSyncState(final String address) {
+ syncStateSet.remove(address);
+ }
}
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
index 95d4c2b10..956151975 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
@@ -17,13 +17,11 @@
package org.apache.rocketmq.controller.processor;
import io.netty.channel.ChannelHandlerContext;
-
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -31,6 +29,7 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.SyncStateSet;
import
org.apache.rocketmq.common.protocol.header.namesrv.BrokerHeartbeatRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import
org.apache.rocketmq.common.protocol.header.namesrv.controller.CleanControllerBrokerDataRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
@@ -46,6 +45,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import static org.apache.rocketmq.common.protocol.RequestCode.BROKER_HEARTBEAT;
+import static
org.apache.rocketmq.common.protocol.RequestCode.CLEAN_BROKER_DATA;
import static
org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET;
import static
org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_ELECT_MASTER;
import static
org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_GET_METADATA_INFO;
@@ -73,9 +73,9 @@ public class ControllerRequestProcessor implements
NettyRequestProcessor {
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws Exception {
if (ctx != null) {
log.debug("Receive request, {} {} {}",
- request.getCode(),
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
- request);
+ request.getCode(),
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+ request);
}
switch (request.getCode()) {
case CONTROLLER_ALTER_SYNC_STATE_SET: {
@@ -153,6 +153,13 @@ public class ControllerRequestProcessor implements
NettyRequestProcessor {
return this.updateControllerConfig(ctx, request);
case GET_CONTROLLER_CONFIG:
return this.getControllerConfig(ctx, request);
+ case CLEAN_BROKER_DATA:
+ final CleanControllerBrokerDataRequestHeader requestHeader =
(CleanControllerBrokerDataRequestHeader)
request.decodeCommandCustomHeader(CleanControllerBrokerDataRequestHeader.class);
+ final CompletableFuture<RemotingCommand> future =
this.controllerManager.getController().cleanBrokerData(requestHeader);
+ if (null != future) {
+ return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+ }
+ break;
default: {
final String error = " request type " + request.getCode() + "
not supported";
return
RemotingCommand.createResponseCommand(ResponseCode.REQUEST_CODE_NOT_SUPPORTED,
error);
@@ -220,4 +227,5 @@ public class ControllerRequestProcessor implements
NettyRequestProcessor {
response.setRemark(null);
return response;
}
+
}
diff --git
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
index 915e383b4..127f07f79 100644
---
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
+++
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.SyncStateSet;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetResponseHeader;
+import
org.apache.rocketmq.common.protocol.header.namesrv.controller.CleanControllerBrokerDataRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerResponseHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
@@ -75,7 +76,7 @@ public class ReplicasInfoManagerTest {
boolean isFirstRegisteredBroker) {
// Register new broker
final RegisterBrokerToControllerRequestHeader registerRequest =
- new RegisterBrokerToControllerRequestHeader(clusterName,
brokerName, brokerAddress);
+ new RegisterBrokerToControllerRequestHeader(clusterName,
brokerName, brokerAddress);
final ControllerResult<RegisterBrokerToControllerResponseHeader>
registerResult = this.replicasInfoManager.registerBroker(registerRequest);
apply(registerResult.getEvents());
@@ -94,7 +95,7 @@ public class ReplicasInfoManagerTest {
private boolean alterNewInSyncSet(String brokerName, String masterAddress,
int masterEpoch,
Set<String> newSyncStateSet, int syncStateSetEpoch) {
final AlterSyncStateSetRequestHeader alterRequest =
- new AlterSyncStateSetRequestHeader(brokerName, masterAddress,
masterEpoch);
+ new AlterSyncStateSetRequestHeader(brokerName, masterAddress,
masterEpoch);
final ControllerResult<AlterSyncStateSetResponseHeader> result =
this.replicasInfoManager.alterSyncStateSet(alterRequest, new
SyncStateSet(newSyncStateSet, syncStateSetEpoch), (va1, va2) -> true);
apply(result.getEvents());
@@ -126,30 +127,29 @@ public class ReplicasInfoManagerTest {
public void mockHeartbeatDataMasterStillAlive() {
this.heartbeatManager.registerBroker("cluster1", "broker1",
"127.0.0.1:9000", 1L, 10000000000L, null,
- 1, 3L);
+ 1, 3L);
this.heartbeatManager.registerBroker("cluster1", "broker1",
"127.0.0.1:9001", 1L, 10000000000L, null,
- 1, 2L);
+ 1, 2L);
this.heartbeatManager.registerBroker("cluster1", "broker1",
"127.0.0.1:9002", 1L, 10000000000L, null,
- 1, 3L);
+ 1, 3L);
}
public void mockHeartbeatDataHigherEpoch() {
this.heartbeatManager.registerBroker("cluster1", "broker1",
"127.0.0.1:9000", 1L, -10000L, null,
- 1, 3L);
+ 1, 3L);
this.heartbeatManager.registerBroker("cluster1", "broker1",
"127.0.0.1:9001", 1L, 10000000000L, null,
- 1, 2L);
+ 1, 2L);
this.heartbeatManager.registerBroker("cluster1", "broker1",
"127.0.0.1:9002", 1L, 10000000000L, null,
- 0, 3L);
+ 0, 3L);
}
-
public void mockHeartbeatDataHigherOffset() {
this.heartbeatManager.registerBroker("cluster1", "broker1",
"127.0.0.1:9000", 1L, -10000L, null,
- 1, 3L);
+ 1, 3L);
this.heartbeatManager.registerBroker("cluster1", "broker1",
"127.0.0.1:9001", 1L, 10000000000L, null,
- 1, 2L);
+ 1, 2L);
this.heartbeatManager.registerBroker("cluster1", "broker1",
"127.0.0.1:9002", 1L, 10000000000L, null,
- 1, 3L);
+ 1, 3L);
}
@Test
@@ -159,7 +159,7 @@ public class ReplicasInfoManagerTest {
ElectPolicy electPolicy = new
DefaultElectPolicy(this.heartbeatManager::isBrokerActive,
this.heartbeatManager::getBrokerLiveInfo);
mockHeartbeatDataMasterStillAlive();
final ControllerResult<ElectMasterResponseHeader> cResult =
this.replicasInfoManager.electMaster(request,
- electPolicy);
+ electPolicy);
assertEquals(ResponseCode.CONTROLLER_INVALID_REQUEST,
cResult.getResponseCode());
}
@@ -170,7 +170,7 @@ public class ReplicasInfoManagerTest {
ElectPolicy electPolicy = new
DefaultElectPolicy(this.heartbeatManager::isBrokerActive,
this.heartbeatManager::getBrokerLiveInfo);
mockHeartbeatDataHigherEpoch();
final ControllerResult<ElectMasterResponseHeader> cResult =
this.replicasInfoManager.electMaster(request,
- electPolicy);
+ electPolicy);
System.out.println(cResult.getResponseCode());
final ElectMasterResponseHeader response = cResult.getResponse();
System.out.println(response);
@@ -186,7 +186,7 @@ public class ReplicasInfoManagerTest {
ElectPolicy electPolicy = new
DefaultElectPolicy(this.heartbeatManager::isBrokerActive,
this.heartbeatManager::getBrokerLiveInfo);
mockHeartbeatDataHigherOffset();
final ControllerResult<ElectMasterResponseHeader> cResult =
this.replicasInfoManager.electMaster(request,
- electPolicy);
+ electPolicy);
System.out.println(cResult.getResponseCode());
final ElectMasterResponseHeader response = cResult.getResponse();
System.out.println(response);
@@ -195,13 +195,12 @@ public class ReplicasInfoManagerTest {
assertEquals("127.0.0.1:9002", response.getNewMasterAddress());
}
-
@Test
public void testElectMaster() {
mockMetaData();
final ElectMasterRequestHeader request = new
ElectMasterRequestHeader("broker1");
final ControllerResult<ElectMasterResponseHeader> cResult =
this.replicasInfoManager.electMaster(request,
- new DefaultElectPolicy((clusterName, brokerAddress) ->
!brokerAddress.equals("127.0.0.1:9000"), null));
+ new DefaultElectPolicy((clusterName, brokerAddress) ->
!brokerAddress.equals("127.0.0.1:9000"), null));
final ElectMasterResponseHeader response = cResult.getResponse();
assertEquals(response.getMasterEpoch(), 2);
assertFalse(response.getNewMasterAddress().isEmpty());
@@ -213,17 +212,17 @@ public class ReplicasInfoManagerTest {
brokerSet.add("127.0.0.1:9002");
final ElectMasterRequestHeader assignRequest = new
ElectMasterRequestHeader("cluster1", "broker1", "127.0.0.1:9000");
final ControllerResult<ElectMasterResponseHeader> cResult1 =
this.replicasInfoManager.electMaster(assignRequest,
- new DefaultElectPolicy((clusterName, brokerAddress) ->
brokerAddress.contains("127.0.0.1:9000"), null));
+ new DefaultElectPolicy((clusterName, brokerAddress) ->
brokerAddress.contains("127.0.0.1:9000"), null));
assertEquals(cResult1.getResponseCode(),
ResponseCode.CONTROLLER_INVALID_REQUEST);
final ElectMasterRequestHeader assignRequest1 = new
ElectMasterRequestHeader("cluster1", "broker1", "127.0.0.1:9001");
final ControllerResult<ElectMasterResponseHeader> cResult2 =
this.replicasInfoManager.electMaster(assignRequest1,
- new DefaultElectPolicy((clusterName, brokerAddress) ->
brokerAddress.equals("127.0.0.1:9000"), null));
+ new DefaultElectPolicy((clusterName, brokerAddress) ->
brokerAddress.equals("127.0.0.1:9000"), null));
assertEquals(cResult2.getResponseCode(),
ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE);
final ElectMasterRequestHeader assignRequest2 = new
ElectMasterRequestHeader("cluster1", "broker1", "127.0.0.1:9001");
final ControllerResult<ElectMasterResponseHeader> cResult3 =
this.replicasInfoManager.electMaster(assignRequest2,
- new DefaultElectPolicy((clusterName, brokerAddress) ->
!brokerAddress.equals("127.0.0.1:9000"), null));
+ new DefaultElectPolicy((clusterName, brokerAddress) ->
!brokerAddress.equals("127.0.0.1:9000"), null));
assertEquals(cResult3.getResponseCode(), ResponseCode.SUCCESS);
final ElectMasterResponseHeader response3 = cResult3.getResponse();
assertEquals(response3.getNewMasterAddress(), "127.0.0.1:9001");
@@ -244,7 +243,7 @@ public class ReplicasInfoManagerTest {
// However, the syncStateSet in statemachine is {"127.0.0.1:9000"},
not more replicas can be elected as master, it will be failed.
final ElectMasterRequestHeader electRequest = new
ElectMasterRequestHeader("broker1");
final ControllerResult<ElectMasterResponseHeader> cResult =
this.replicasInfoManager.electMaster(electRequest,
- new DefaultElectPolicy((clusterName, brokerAddress) ->
!brokerAddress.equals("127.0.0.1:9000"), null));
+ new DefaultElectPolicy((clusterName, brokerAddress) ->
!brokerAddress.equals("127.0.0.1:9000"), null));
final List<EventMessage> events = cResult.getEvents();
assertEquals(events.size(), 1);
final ElectMasterEvent event = (ElectMasterEvent) events.get(0);
@@ -257,4 +256,38 @@ public class ReplicasInfoManagerTest {
assertEquals(replicaInfo.getMasterEpoch(), 2);
}
+ @Test
+ public void testCleanBrokerData() {
+ mockMetaData();
+ CleanControllerBrokerDataRequestHeader header1 = new
CleanControllerBrokerDataRequestHeader("cluster1", "broker1", "127.0.0.1:9000");
+ ControllerResult<Void> result1 =
this.replicasInfoManager.cleanBrokerData(header1, (cluster, brokerAddr) ->
true);
+ assertEquals(ResponseCode.CONTROLLER_INVALID_CLEAN_BROKER_METADATA,
result1.getResponseCode());
+
+ CleanControllerBrokerDataRequestHeader header2 = new
CleanControllerBrokerDataRequestHeader("cluster1", "broker1", null);
+ ControllerResult<Void> result2 =
this.replicasInfoManager.cleanBrokerData(header2, (cluster, brokerAddr) ->
true);
+ assertEquals(ResponseCode.CONTROLLER_INVALID_CLEAN_BROKER_METADATA,
result2.getResponseCode());
+ assertEquals("Broker broker1 is still alive, clean up failure",
result2.getRemark());
+
+ CleanControllerBrokerDataRequestHeader header3 = new
CleanControllerBrokerDataRequestHeader("cluster1", "broker1", "127.0.0.1:9000");
+ ControllerResult<Void> result3 =
this.replicasInfoManager.cleanBrokerData(header3, (cluster, brokerAddr) ->
false);
+ assertEquals(ResponseCode.SUCCESS, result3.getResponseCode());
+
+ CleanControllerBrokerDataRequestHeader header4 = new
CleanControllerBrokerDataRequestHeader("cluster1", "broker1",
"127.0.0.1:9000;127.0.0.1:9001;127.0.0.1:9002");
+ ControllerResult<Void> result4 =
this.replicasInfoManager.cleanBrokerData(header4, (cluster, brokerAddr) ->
false);
+ assertEquals(ResponseCode.SUCCESS, result4.getResponseCode());
+
+ CleanControllerBrokerDataRequestHeader header5 = new
CleanControllerBrokerDataRequestHeader("cluster1", "broker12",
"127.0.0.1:9000;127.0.0.1:9001;127.0.0.1:9002", true);
+ ControllerResult<Void> result5 =
this.replicasInfoManager.cleanBrokerData(header5, (cluster, brokerAddr) ->
false);
+ assertEquals(ResponseCode.CONTROLLER_INVALID_CLEAN_BROKER_METADATA,
result5.getResponseCode());
+ assertEquals("Broker broker12 is not existed,clean broker data
failure.", result5.getRemark());
+
+ CleanControllerBrokerDataRequestHeader header6 = new
CleanControllerBrokerDataRequestHeader(null, "broker12",
"127.0.0.1:9000;127.0.0.1:9001;127.0.0.1:9002", true);
+ ControllerResult<Void> result6 =
this.replicasInfoManager.cleanBrokerData(header6, (cluster, brokerAddr) ->
cluster != null);
+ assertEquals(ResponseCode.CONTROLLER_INVALID_CLEAN_BROKER_METADATA,
result6.getResponseCode());
+
+ CleanControllerBrokerDataRequestHeader header7 = new
CleanControllerBrokerDataRequestHeader(null, "broker1",
"127.0.0.1:9000;127.0.0.1:9001;127.0.0.1:9002", true);
+ ControllerResult<Void> result7 =
this.replicasInfoManager.cleanBrokerData(header7, (cluster, brokerAddr) ->
false);
+ assertEquals(ResponseCode.SUCCESS, result7.getResponseCode());
+
+ }
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 0b550b56b..985544fa0 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -829,4 +829,10 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
String brokerName, String brokerAddr) throws RemotingException,
InterruptedException, MQBrokerException {
return this.defaultMQAdminExtImpl.electMaster(controllerAddr,
clusterName, brokerName, brokerAddr);
}
+
+ @Override
+ public void cleanControllerBrokerData(String controllerAddr, String
clusterName, String brokerName,
+ String brokerAddr, boolean isCleanLivingBroker) throws
RemotingException, InterruptedException, MQBrokerException {
+ this.defaultMQAdminExtImpl.cleanControllerBrokerData(controllerAddr,
clusterName, brokerName, brokerAddr,isCleanLivingBroker);
+ }
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index acfd70115..00ca230a5 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -58,10 +58,6 @@ import org.apache.rocketmq.common.admin.RollbackStats;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.help.FAQUrl;
-import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
-import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
-import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
@@ -71,6 +67,7 @@ import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.namesrv.NamesrvUtil;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
+import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
@@ -82,6 +79,7 @@ import org.apache.rocketmq.common.protocol.body.HARuntimeInfo;
import org.apache.rocketmq.common.protocol.body.InSyncStateData;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
@@ -89,6 +87,7 @@ import
org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import
org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import
org.apache.rocketmq.common.protocol.header.UpdateGroupForbiddenRequestHeader;
+import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
@@ -98,6 +97,7 @@ import
org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.subscription.GroupForbidden;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
@@ -1833,6 +1833,13 @@ public class DefaultMQAdminExtImpl implements
MQAdminExt, MQAdminExtInner {
this.mqClientInstance.getMQClientAPIImpl().updateControllerConfig(properties,
controllers, timeoutMillis);
}
+ @Override
+ public void cleanControllerBrokerData(String controllerAddr, String
clusterName, String brokerName,
+ String brokerAddr, boolean isCleanLivingBroker)
+ throws RemotingException, InterruptedException, MQBrokerException {
+
this.mqClientInstance.getMQClientAPIImpl().cleanControllerBrokerData(controllerAddr,
clusterName, brokerName, brokerAddr, isCleanLivingBroker);
+ }
+
public MQClientInstance getMqClientInstance() {
return mqClientInstance;
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 857f159b5..38da1e5ce 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -449,6 +449,7 @@ public interface MQAdminExt extends MQAdmin {
* manual trigger broker elect master
*
* @param controllerAddr controller address
+ * @param clusterName cluster name
* @param brokerName broker name
* @param brokerAddr broker address
* @return
@@ -457,6 +458,13 @@ public interface MQAdminExt extends MQAdmin {
* @throws MQBrokerException
*/
ElectMasterResponseHeader electMaster(String controllerAddr, String
clusterName, String brokerName,
- String brokerAddr)
- throws RemotingException, InterruptedException, MQBrokerException;
+ String brokerAddr) throws RemotingException, InterruptedException,
MQBrokerException;
+
+ /**
+ * clean controller broker meta data
+ *
+ */
+ void cleanControllerBrokerData(String controllerAddr, String clusterName,
String brokerName,
+ String brokerAddr, boolean isCleanLivingBroker) throws
RemotingException, InterruptedException, MQBrokerException;
+
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index 52aded62d..c27dce1a2 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -18,10 +18,10 @@ package org.apache.rocketmq.tools.command;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
-import java.util.ArrayList;
-import java.util.List;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
@@ -59,15 +59,16 @@ import
org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand;
import org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand;
import org.apache.rocketmq.tools.command.container.AddBrokerSubCommand;
import org.apache.rocketmq.tools.command.container.RemoveBrokerSubCommand;
+import
org.apache.rocketmq.tools.command.controller.CleanControllerBrokerDataSubCommand;
import
org.apache.rocketmq.tools.command.controller.GetControllerConfigSubCommand;
import
org.apache.rocketmq.tools.command.controller.GetControllerMetaDataSubCommand;
-import
org.apache.rocketmq.tools.command.controller.UpdateControllerConfigSubCommand;
import org.apache.rocketmq.tools.command.controller.ReElectMasterSubCommand;
+import
org.apache.rocketmq.tools.command.controller.UpdateControllerConfigSubCommand;
import org.apache.rocketmq.tools.command.export.ExportConfigsCommand;
import org.apache.rocketmq.tools.command.export.ExportMetadataCommand;
import org.apache.rocketmq.tools.command.export.ExportMetricsCommand;
-import org.apache.rocketmq.tools.command.ha.HAStatusSubCommand;
import org.apache.rocketmq.tools.command.ha.GetSyncStateSetSubCommand;
+import org.apache.rocketmq.tools.command.ha.HAStatusSubCommand;
import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand;
import org.apache.rocketmq.tools.command.message.ConsumeMessageCommand;
import org.apache.rocketmq.tools.command.message.PrintMessageByQueueCommand;
@@ -265,6 +266,7 @@ public class MQAdminStartup {
initCommand(new GetControllerConfigSubCommand());
initCommand(new UpdateControllerConfigSubCommand());
initCommand(new ReElectMasterSubCommand());
+ initCommand(new CleanControllerBrokerDataSubCommand());
}
private static void initLogback() throws Exception {
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/CleanControllerBrokerDataSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/CleanControllerBrokerDataSubCommand.java
new file mode 100644
index 000000000..4b354ac7f
--- /dev/null
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/CleanControllerBrokerDataSubCommand.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.controller;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class CleanControllerBrokerDataSubCommand implements SubCommand {
+
+ @Override
+ public String commandName() {
+ return "cleanBrokerData";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Clean data of broker on controller";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+
+ Option opt = new Option("a", "controllerAddress", true, "The address
of controller");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("b", "brokerAddress", true, "The address of the
broker which requires to clean metadata. eg:
192.168.0.1:30911;192.168.0.2:30911");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("n", "brokerName", true, "The broker name of the
replicas that require to be manipulated");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("c", "clusterName", true, "the clusterName of
broker");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("l", "cleanLivingBroker", false, " whether clean up
living brokers,default value is false");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook
rpcHook) throws SubCommandException {
+
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ String controllerAddress = commandLine.getOptionValue('a').trim();
+ String brokerName = commandLine.getOptionValue('n').trim();
+ String clusterName = null;
+ String brokerAddress = null;
+
+ if (commandLine.hasOption('c')) {
+ clusterName = commandLine.getOptionValue('c').trim();
+ }
+ if (commandLine.hasOption('b')) {
+ brokerAddress = commandLine.getOptionValue('b').trim();
+ }
+ boolean isCleanLivingBroker = false;
+ if (commandLine.hasOption('l')) {
+ isCleanLivingBroker = true;
+ }
+
+ if (!isCleanLivingBroker && StringUtils.isEmpty(clusterName)) {
+ throw new IllegalArgumentException("cleanLivingBroker option is
false,clusterName option can not be empty.");
+ }
+
+ try {
+ defaultMQAdminExt.start();
+ defaultMQAdminExt.cleanControllerBrokerData(controllerAddress,
clusterName, brokerName, brokerAddress, isCleanLivingBroker);
+ System.out.printf("clear broker %s data from controller success!
\n", brokerName);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + "
command failed", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+}