This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new a961d1b44 [ISSUE #4817] Add a command to clear broker data from 
controller for CLI (#4823)
a961d1b44 is described below

commit a961d1b44a23af4abf0783931d362c071037135b
Author: mxsm <[email protected]>
AuthorDate: Sat Aug 27 19:42:55 2022 +0800

    [ISSUE #4817] Add a command to clear broker data from controller for CLI 
(#4823)
---
 .../rocketmq/client/impl/MQClientAPIImpl.java      |  33 ++++++-
 .../rocketmq/common/protocol/RequestCode.java      |   7 ++
 .../rocketmq/common/protocol/ResponseCode.java     |   2 +
 .../CleanControllerBrokerDataRequestHeader.java    |  99 ++++++++++++++++++++
 .../org/apache/rocketmq/controller/Controller.java |   9 +-
 .../rocketmq/controller/ControllerManager.java     |   1 +
 .../controller/impl/DLedgerController.java         |  28 +++---
 .../impl/DefaultBrokerHeartbeatManager.java        |   2 +-
 .../impl/event/CleanBrokerDataEvent.java           |  64 +++++++++++++
 .../controller/impl/event/EventSerializer.java     |   2 +
 .../rocketmq/controller/impl/event/EventType.java  |   5 +-
 .../controller/impl/manager/BrokerInfo.java        |   4 +
 .../impl/manager/ReplicasInfoManager.java          |  81 +++++++++++++++-
 .../controller/impl/manager/SyncStateInfo.java     |   4 +
 .../processor/ControllerRequestProcessor.java      |  18 +++-
 .../impl/manager/ReplicasInfoManagerTest.java      |  75 ++++++++++-----
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |   6 ++
 .../tools/admin/DefaultMQAdminExtImpl.java         |  15 ++-
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |  12 ++-
 .../rocketmq/tools/command/MQAdminStartup.java     |  10 +-
 .../CleanControllerBrokerDataSubCommand.java       | 103 +++++++++++++++++++++
 21 files changed, 522 insertions(+), 58 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 7de364bcb..aa2280591 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.client.impl;
 
+import com.alibaba.fastjson.JSON;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -96,9 +97,9 @@ import org.apache.rocketmq.common.protocol.body.KVTable;
 import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
 import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody;
 import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
 import org.apache.rocketmq.common.protocol.body.QueryAssignmentRequestBody;
 import org.apache.rocketmq.common.protocol.body.QueryAssignmentResponseBody;
-import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
 import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
 import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody;
 import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody;
@@ -185,6 +186,7 @@ import 
org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHea
 import 
org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.CleanControllerBrokerDataRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
@@ -193,10 +195,10 @@ import 
org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.rpchook.DynamicalExtFieldRPCHook;
+import org.apache.rocketmq.common.rpchook.StreamTypeRPCHook;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
 import org.apache.rocketmq.common.subscription.GroupForbidden;
-import org.apache.rocketmq.common.rpchook.StreamTypeRPCHook;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.common.sysflag.PullSysFlag;
 import org.apache.rocketmq.logging.InternalLogger;
@@ -217,8 +219,8 @@ import org.apache.rocketmq.remoting.netty.ResponseFuture;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
 import static 
org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
-import com.alibaba.fastjson.JSON;
 
 public class MQClientAPIImpl implements NameServerUpdateCallback {
     private final static InternalLogger log = ClientLogger.getLog();
@@ -3058,4 +3060,29 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback {
         }
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
+
+    public void cleanControllerBrokerData(String controllerAddr, String 
clusterName,
+        String brokerName, String brokerAddr, boolean isCleanLivingBroker)
+        throws RemotingException, InterruptedException, MQBrokerException {
+
+        //get controller leader address
+        final GetMetaDataResponseHeader controllerMetaData = 
this.getControllerMetaData(controllerAddr);
+        assert controllerMetaData != null;
+        assert controllerMetaData.getControllerLeaderAddress() != null;
+        final String leaderAddress = 
controllerMetaData.getControllerLeaderAddress();
+
+        CleanControllerBrokerDataRequestHeader cleanHeader = new 
CleanControllerBrokerDataRequestHeader(clusterName, brokerName, brokerAddr, 
isCleanLivingBroker);
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CLEAN_BROKER_DATA, 
cleanHeader);
+
+        final RemotingCommand response = 
this.remotingClient.invokeSync(leaderAddress, request, 3000);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return;
+            }
+            default:
+                break;
+        }
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java 
b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 9441e4fd0..1d3393b57 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -269,4 +269,11 @@ public class RequestCode {
      * get config from controller
      */
     public static final int GET_CONTROLLER_CONFIG = 1010;
+
+    /**
+     * clean broker data
+     */
+    public static final int CLEAN_BROKER_DATA = 1011;
+
+
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java 
b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
index f68bac279..ac8a286ee 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
@@ -111,4 +111,6 @@ public class ResponseCode extends RemotingSysResponseCode {
 
     public static final int CONTROLLER_BROKER_METADATA_NOT_EXIST = 2008;
 
+    public static final int CONTROLLER_INVALID_CLEAN_BROKER_METADATA = 2009;
+
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/CleanControllerBrokerDataRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/CleanControllerBrokerDataRequestHeader.java
new file mode 100644
index 000000000..2600a52f0
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/CleanControllerBrokerDataRequestHeader.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header.namesrv.controller;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.annotation.CFNullable;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class CleanControllerBrokerDataRequestHeader implements 
CommandCustomHeader {
+
+    @CFNullable
+    private String clusterName;
+
+    @CFNotNull
+    private String brokerName;
+
+    @CFNullable
+    private String brokerAddress;
+
+    private boolean isCleanLivingBroker = false;
+
+    public CleanControllerBrokerDataRequestHeader() {
+    }
+
+    public CleanControllerBrokerDataRequestHeader(String clusterName, String 
brokerName, String brokerAddress,
+        boolean isCleanLivingBroker) {
+        this.clusterName = clusterName;
+        this.brokerName = brokerName;
+        this.brokerAddress = brokerAddress;
+        this.isCleanLivingBroker = isCleanLivingBroker;
+    }
+
+    public CleanControllerBrokerDataRequestHeader(String clusterName, String 
brokerName, String brokerAddress) {
+        this(clusterName, brokerName, brokerAddress, false);
+    }
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+    public String getBrokerAddress() {
+        return brokerAddress;
+    }
+
+    public void setBrokerAddress(String brokerAddress) {
+        this.brokerAddress = brokerAddress;
+    }
+
+    public boolean isCleanLivingBroker() {
+        return isCleanLivingBroker;
+    }
+
+    public void setCleanLivingBroker(boolean cleanLivingBroker) {
+        isCleanLivingBroker = cleanLivingBroker;
+    }
+
+    @Override
+    public String toString() {
+        return "CleanControllerBrokerDataRequestHeader{" +
+            "clusterName='" + clusterName + '\'' +
+            "brokerName='" + brokerName + '\'' +
+            "brokerAddress='" + brokerAddress + '\'' +
+            "isCleanLivingBroker='" + isCleanLivingBroker + '\'' +
+            '}';
+    }
+}
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/Controller.java 
b/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
index 1cd724307..628469e67 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
@@ -21,9 +21,10 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.common.protocol.body.SyncStateSet;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
-import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.CleanControllerBrokerDataRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerRequestHeader;
 import org.apache.rocketmq.remoting.RemotingServer;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
@@ -106,4 +107,10 @@ public interface Controller {
      * Get the remotingServer used by the controller, the upper layer will 
reuse this remotingServer.
      */
     RemotingServer getRemotingServer();
+
+    /**
+     * Clean controller broker data
+     *
+     */
+    CompletableFuture<RemotingCommand> cleanBrokerData(final 
CleanControllerBrokerDataRequestHeader requestHeader);
 }
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index cd4a60158..f8ae7eb4f 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -189,6 +189,7 @@ public class ControllerManager {
         
controllerRemotingServer.registerProcessor(RequestCode.BROKER_HEARTBEAT, 
controllerRequestProcessor, this.controllerRequestExecutor);
         
controllerRemotingServer.registerProcessor(RequestCode.UPDATE_CONTROLLER_CONFIG,
 controllerRequestProcessor, this.controllerRequestExecutor);
         
controllerRemotingServer.registerProcessor(RequestCode.GET_CONTROLLER_CONFIG, 
controllerRequestProcessor, this.controllerRequestExecutor);
+        
controllerRemotingServer.registerProcessor(RequestCode.CLEAN_BROKER_DATA, 
controllerRequestProcessor, this.controllerRequestExecutor);
     }
 
     public void start() {
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
index c6a8b3345..1e34aad97 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
@@ -37,18 +37,18 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiPredicate;
 import java.util.function.Supplier;
-
+import org.apache.rocketmq.common.ControllerConfig;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.ControllerConfig;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.SyncStateSet;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
-import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.CleanControllerBrokerDataRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerRequestHeader;
 import org.apache.rocketmq.controller.Controller;
 import org.apache.rocketmq.controller.elect.ElectPolicy;
 import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
@@ -84,7 +84,6 @@ public class DLedgerController implements Controller {
     // use for elect a master
     private ElectPolicy electPolicy;
 
-
     private AtomicBoolean isScheduling = new AtomicBoolean(false);
 
     public DLedgerController(final ControllerConfig config, final 
BiPredicate<String, String> brokerAlivePredicate) {
@@ -92,9 +91,9 @@ public class DLedgerController implements Controller {
     }
 
     public DLedgerController(final ControllerConfig controllerConfig,
-                             final BiPredicate<String, String> 
brokerAlivePredicate, final NettyServerConfig nettyServerConfig,
-                             final NettyClientConfig nettyClientConfig, final 
ChannelEventListener channelEventListener,
-                             final ElectPolicy electPolicy) {
+        final BiPredicate<String, String> brokerAlivePredicate, final 
NettyServerConfig nettyServerConfig,
+        final NettyClientConfig nettyClientConfig, final ChannelEventListener 
channelEventListener,
+        final ElectPolicy electPolicy) {
         this.controllerConfig = controllerConfig;
         this.eventSerializer = new EventSerializer();
         this.scheduler = new EventScheduler();
@@ -154,7 +153,7 @@ public class DLedgerController implements Controller {
 
     @Override
     public CompletableFuture<RemotingCommand> 
alterSyncStateSet(AlterSyncStateSetRequestHeader request,
-                                                                final 
SyncStateSet syncStateSet) {
+        final SyncStateSet syncStateSet) {
         return this.scheduler.appendEvent("alterSyncStateSet",
             () -> this.replicasInfoManager.alterSyncStateSet(request, 
syncStateSet, this.brokerAlivePredicate), true);
     }
@@ -193,7 +192,7 @@ public class DLedgerController implements Controller {
             sb.append(peer).append(";");
         }
         return 
RemotingCommand.createResponseCommandWithHeader(ResponseCode.SUCCESS, new 
GetMetaDataResponseHeader(
-                state.getGroup(), state.getLeaderId(), state.getLeaderAddr(), 
state.isLeader(), sb.toString()));
+            state.getGroup(), state.getLeaderId(), state.getLeaderAddr(), 
state.isLeader(), sb.toString()));
     }
 
     @Override
@@ -201,6 +200,13 @@ public class DLedgerController implements Controller {
         return this.dLedgerServer.getRemotingServer();
     }
 
+    @Override
+    public CompletableFuture<RemotingCommand> cleanBrokerData(
+        final CleanControllerBrokerDataRequestHeader requestHeader) {
+        return this.scheduler.appendEvent("cleanBrokerData",
+            () -> this.replicasInfoManager.cleanBrokerData(requestHeader, 
this.brokerAlivePredicate), true);
+    }
+
     /**
      * Append the request to dledger, wait the dledger to commit the request.
      */
@@ -288,7 +294,7 @@ public class DLedgerController implements Controller {
         }
 
         public <T> CompletableFuture<RemotingCommand> appendEvent(final String 
name,
-                                                                  final 
Supplier<ControllerResult<T>> supplier, boolean isWriteEvent) {
+            final Supplier<ControllerResult<T>> supplier, boolean 
isWriteEvent) {
             if (isStopped() || 
!DLedgerController.this.roleHandler.isLeaderState()) {
                 final RemotingCommand command = 
RemotingCommand.createResponseCommand(ResponseCode.CONTROLLER_NOT_LEADER, "The 
controller is not in leader state");
                 final CompletableFuture<RemotingCommand> future = new 
CompletableFuture<>();
@@ -325,7 +331,7 @@ public class DLedgerController implements Controller {
         private final boolean isWriteEvent;
 
         ControllerEventHandler(final String name, final 
Supplier<ControllerResult<T>> supplier,
-                               final boolean isWriteEvent) {
+            final boolean isWriteEvent) {
             this.name = name;
             this.supplier = supplier;
             this.future = new CompletableFuture<>();
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
index e56d97c99..c369d8353 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
@@ -30,9 +30,9 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.rocketmq.common.BrokerAddrInfo;
+import org.apache.rocketmq.common.ControllerConfig;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.ControllerConfig;
 import org.apache.rocketmq.controller.BrokerHeartbeatManager;
 import org.apache.rocketmq.controller.BrokerLiveInfo;
 import org.apache.rocketmq.logging.InternalLogger;
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/CleanBrokerDataEvent.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/CleanBrokerDataEvent.java
new file mode 100644
index 000000000..4678f90c4
--- /dev/null
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/CleanBrokerDataEvent.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.controller.impl.event;
+
+import java.util.Set;
+
+public class CleanBrokerDataEvent implements EventMessage {
+
+    private String brokerName;
+
+    private Set<String> brokerAddressSet;
+
+    public CleanBrokerDataEvent(String brokerName, Set<String> 
brokerAddressSet) {
+        this.brokerName = brokerName;
+        this.brokerAddressSet = brokerAddressSet;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+    public Set<String> getBrokerAddressSet() {
+        return brokerAddressSet;
+    }
+
+    public void setBrokerAddressSet(Set<String> brokerAddressSet) {
+        this.brokerAddressSet = brokerAddressSet;
+    }
+
+    /**
+     * Returns the event type of this message
+     */
+    @Override
+    public EventType getEventType() {
+        return EventType.CLEAN_BROKER_DATA_EVENT;
+    }
+
+    @Override
+    public String toString() {
+        return "CleanBrokerDataEvent{" +
+            "brokerName='" + brokerName + '\'' +
+            ", brokerAddressSet=" + brokerAddressSet +
+            '}';
+    }
+}
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventSerializer.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventSerializer.java
index f78f399a5..d49616f2d 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventSerializer.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventSerializer.java
@@ -67,6 +67,8 @@ public class EventSerializer {
                         return this.serializer.deserialize(data, 
ApplyBrokerIdEvent.class);
                     case ELECT_MASTER_EVENT:
                         return this.serializer.deserialize(data, 
ElectMasterEvent.class);
+                    case CLEAN_BROKER_DATA_EVENT:
+                        return this.serializer.deserialize(data, 
CleanBrokerDataEvent.class);
                     default:
                         break;
                 }
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventType.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventType.java
index f6971cbdf..6f100438e 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventType.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventType.java
@@ -23,7 +23,8 @@ public enum EventType {
     ALTER_SYNC_STATE_SET_EVENT("AlterSyncStateSetEvent", (short) 1),
     APPLY_BROKER_ID_EVENT("ApplyBrokerIdEvent", (short) 2),
     ELECT_MASTER_EVENT("ElectMasterEvent", (short) 3),
-    READ_EVENT("ReadEvent", (short) 4);
+    READ_EVENT("ReadEvent", (short) 4),
+    CLEAN_BROKER_DATA_EVENT("CleanBrokerDataEvent", (short) 5);
 
     private final String name;
     private final short id;
@@ -43,6 +44,8 @@ public enum EventType {
                 return ELECT_MASTER_EVENT;
             case 4:
                 return READ_EVENT;
+            case 5:
+                return CLEAN_BROKER_DATA_EVENT;
         }
         return null;
     }
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
index 2c283c08c..d6e203ca6 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
@@ -38,6 +38,10 @@ public class BrokerInfo {
         this.brokerIdTable = new HashMap<>();
     }
 
+    public void removeBrokerAddress(final String address) {
+        this.brokerIdTable.remove(address);
+    }
+
     public long newBrokerId() {
         return this.brokerIdCount.incrementAndGet();
     }
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index ca36ae2e3..683a17d4e 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -23,26 +23,29 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.BiPredicate;
-
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.ControllerConfig;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.ControllerConfig;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.BrokerMemberGroup;
 import org.apache.rocketmq.common.protocol.body.InSyncStateData;
 import org.apache.rocketmq.common.protocol.body.SyncStateSet;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetResponseHeader;
-import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerRequestHeader;
-import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerResponseHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.CleanControllerBrokerDataRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerResponseHeader;
 import org.apache.rocketmq.controller.elect.ElectPolicy;
 import org.apache.rocketmq.controller.impl.event.AlterSyncStateSetEvent;
 import org.apache.rocketmq.controller.impl.event.ApplyBrokerIdEvent;
+import org.apache.rocketmq.controller.impl.event.CleanBrokerDataEvent;
 import org.apache.rocketmq.controller.impl.event.ControllerResult;
 import org.apache.rocketmq.controller.impl.event.ElectMasterEvent;
 import org.apache.rocketmq.controller.impl.event.EventMessage;
@@ -150,7 +153,8 @@ public class ReplicasInfoManager {
         return result;
     }
 
-    public ControllerResult<ElectMasterResponseHeader> electMaster(final 
ElectMasterRequestHeader request, final ElectPolicy electPolicy) {
+    public ControllerResult<ElectMasterResponseHeader> electMaster(final 
ElectMasterRequestHeader request,
+        final ElectPolicy electPolicy) {
         final String brokerName = request.getBrokerName();
         final String assignBrokerAddress = request.getBrokerAddress();
         final ControllerResult<ElectMasterResponseHeader> result = new 
ControllerResult<>(new ElectMasterResponseHeader());
@@ -316,6 +320,43 @@ public class ReplicasInfoManager {
         return result;
     }
 
+    public ControllerResult<Void> cleanBrokerData(final 
CleanControllerBrokerDataRequestHeader requestHeader,
+        final BiPredicate<String, String> brokerAlivePredicate) {
+        final ControllerResult<Void> result = new ControllerResult<>();
+
+        final String clusterName = requestHeader.getClusterName();
+        final String brokerName = requestHeader.getBrokerName();
+        final String brokerAddrs = requestHeader.getBrokerAddress();
+
+        Set<String> brokerAddressSet = null;
+        if (!requestHeader.isCleanLivingBroker()) {
+            //if SyncStateInfo.masterAddress is not empty, at least one broker 
with the same BrokerName is alive
+            SyncStateInfo syncStateInfo = 
this.syncStateSetInfoTable.get(brokerName);
+            if (StringUtils.isBlank(brokerAddrs) && null != syncStateInfo && 
StringUtils.isNotEmpty(syncStateInfo.getMasterAddress())) {
+                String remark = String.format("Broker %s is still alive, clean 
up failure", requestHeader.getBrokerName());
+                
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_CLEAN_BROKER_METADATA, 
remark);
+                return result;
+            }
+            if (StringUtils.isNotBlank(brokerAddrs)) {
+                brokerAddressSet = 
Stream.of(brokerAddrs.split(";")).collect(Collectors.toSet());
+                for (String brokerAddr : brokerAddressSet) {
+                    if (brokerAlivePredicate.test(clusterName, brokerAddr)) {
+                        String remark = String.format("Broker [%s,  %s] is 
still alive, clean up failure", requestHeader.getBrokerName(), brokerAddr);
+                        
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_CLEAN_BROKER_METADATA, 
remark);
+                        return result;
+                    }
+                }
+            }
+        }
+        if (isContainsBroker(brokerName)) {
+            final CleanBrokerDataEvent event = new 
CleanBrokerDataEvent(brokerName, brokerAddressSet);
+            result.addEvent(event);
+            return result;
+        }
+        
result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_CLEAN_BROKER_METADATA, 
String.format("Broker %s is not existed,clean broker data failure.", 
brokerName));
+        return result;
+    }
+
     /**
      * Apply events to memory statemachine.
      *
@@ -333,6 +374,9 @@ public class ReplicasInfoManager {
             case ELECT_MASTER_EVENT:
                 handleElectMaster((ElectMasterEvent) event);
                 break;
+            case CLEAN_BROKER_DATA_EVENT:
+                handleCleanBrokerDataEvent((CleanBrokerDataEvent) event);
+                break;
             default:
                 break;
         }
@@ -387,6 +431,33 @@ public class ReplicasInfoManager {
         }
     }
 
+    private void handleCleanBrokerDataEvent(final CleanBrokerDataEvent event) {
+
+        final String brokerName = event.getBrokerName();
+        final Set<String> brokerAddressSet = event.getBrokerAddressSet();
+
+        if (null == brokerAddressSet || brokerAddressSet.isEmpty()) {
+            this.replicaInfoTable.remove(brokerName);
+            this.syncStateSetInfoTable.remove(brokerName);
+            return;
+        }
+        if (!isContainsBroker(brokerName)) {
+            return;
+        }
+        final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+        final SyncStateInfo syncStateInfo = 
this.syncStateSetInfoTable.get(brokerName);
+        for (String brokerAddress : brokerAddressSet) {
+            brokerInfo.removeBrokerAddress(brokerAddress);
+            syncStateInfo.removeSyncState(brokerAddress);
+        }
+        if (brokerInfo.getBrokerIdTable().isEmpty()) {
+            this.replicaInfoTable.remove(brokerName);
+        }
+        if (syncStateInfo.getSyncStateSet().isEmpty()) {
+            this.syncStateSetInfoTable.remove(brokerName);
+        }
+    }
+
     /**
      * Is the broker existed in the memory metadata
      *
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
index 346d4ccdd..997dee3c5 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
@@ -79,4 +79,8 @@ public class SyncStateInfo {
     public int getMasterEpoch() {
         return masterEpoch;
     }
+
+    public void removeSyncState(final String address) {
+        syncStateSet.remove(address);
+    }
 }
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
index 95d4c2b10..956151975 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
@@ -17,13 +17,11 @@
 package org.apache.rocketmq.controller.processor;
 
 import io.netty.channel.ChannelHandlerContext;
-
 import java.io.UnsupportedEncodingException;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -31,6 +29,7 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.SyncStateSet;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.BrokerHeartbeatRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.CleanControllerBrokerDataRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
@@ -46,6 +45,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 import static org.apache.rocketmq.common.protocol.RequestCode.BROKER_HEARTBEAT;
+import static 
org.apache.rocketmq.common.protocol.RequestCode.CLEAN_BROKER_DATA;
 import static 
org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET;
 import static 
org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_ELECT_MASTER;
 import static 
org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_GET_METADATA_INFO;
@@ -73,9 +73,9 @@ public class ControllerRequestProcessor implements 
NettyRequestProcessor {
     public RemotingCommand processRequest(ChannelHandlerContext ctx, 
RemotingCommand request) throws Exception {
         if (ctx != null) {
             log.debug("Receive request, {} {} {}",
-                    request.getCode(),
-                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
-                    request);
+                request.getCode(),
+                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+                request);
         }
         switch (request.getCode()) {
             case CONTROLLER_ALTER_SYNC_STATE_SET: {
@@ -153,6 +153,13 @@ public class ControllerRequestProcessor implements 
NettyRequestProcessor {
                 return this.updateControllerConfig(ctx, request);
             case GET_CONTROLLER_CONFIG:
                 return this.getControllerConfig(ctx, request);
+            case CLEAN_BROKER_DATA:
+                final CleanControllerBrokerDataRequestHeader requestHeader = 
(CleanControllerBrokerDataRequestHeader) 
request.decodeCommandCustomHeader(CleanControllerBrokerDataRequestHeader.class);
+                final CompletableFuture<RemotingCommand> future = 
this.controllerManager.getController().cleanBrokerData(requestHeader);
+                if (null != future) {
+                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+                }
+                break;
             default: {
                 final String error = " request type " + request.getCode() + " 
not supported";
                 return 
RemotingCommand.createResponseCommand(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, 
error);
@@ -220,4 +227,5 @@ public class ControllerRequestProcessor implements 
NettyRequestProcessor {
         response.setRemark(null);
         return response;
     }
+
 }
diff --git 
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
 
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
index 915e383b4..127f07f79 100644
--- 
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
+++ 
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.SyncStateSet;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetResponseHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.CleanControllerBrokerDataRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerResponseHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
@@ -75,7 +76,7 @@ public class ReplicasInfoManagerTest {
         boolean isFirstRegisteredBroker) {
         // Register new broker
         final RegisterBrokerToControllerRequestHeader registerRequest =
-                new RegisterBrokerToControllerRequestHeader(clusterName, 
brokerName, brokerAddress);
+            new RegisterBrokerToControllerRequestHeader(clusterName, 
brokerName, brokerAddress);
         final ControllerResult<RegisterBrokerToControllerResponseHeader> 
registerResult = this.replicasInfoManager.registerBroker(registerRequest);
         apply(registerResult.getEvents());
 
@@ -94,7 +95,7 @@ public class ReplicasInfoManagerTest {
     private boolean alterNewInSyncSet(String brokerName, String masterAddress, 
int masterEpoch,
         Set<String> newSyncStateSet, int syncStateSetEpoch) {
         final AlterSyncStateSetRequestHeader alterRequest =
-                new AlterSyncStateSetRequestHeader(brokerName, masterAddress, 
masterEpoch);
+            new AlterSyncStateSetRequestHeader(brokerName, masterAddress, 
masterEpoch);
         final ControllerResult<AlterSyncStateSetResponseHeader> result = 
this.replicasInfoManager.alterSyncStateSet(alterRequest, new 
SyncStateSet(newSyncStateSet, syncStateSetEpoch), (va1, va2) -> true);
         apply(result.getEvents());
 
@@ -126,30 +127,29 @@ public class ReplicasInfoManagerTest {
 
     public void mockHeartbeatDataMasterStillAlive() {
         this.heartbeatManager.registerBroker("cluster1", "broker1", 
"127.0.0.1:9000", 1L, 10000000000L, null,
-                1, 3L);
+            1, 3L);
         this.heartbeatManager.registerBroker("cluster1", "broker1", 
"127.0.0.1:9001", 1L, 10000000000L, null,
-                1, 2L);
+            1, 2L);
         this.heartbeatManager.registerBroker("cluster1", "broker1", 
"127.0.0.1:9002", 1L, 10000000000L, null,
-                1, 3L);
+            1, 3L);
     }
 
     public void mockHeartbeatDataHigherEpoch() {
         this.heartbeatManager.registerBroker("cluster1", "broker1", 
"127.0.0.1:9000", 1L, -10000L, null,
-                1, 3L);
+            1, 3L);
         this.heartbeatManager.registerBroker("cluster1", "broker1", 
"127.0.0.1:9001", 1L, 10000000000L, null,
-                1, 2L);
+            1, 2L);
         this.heartbeatManager.registerBroker("cluster1", "broker1", 
"127.0.0.1:9002", 1L, 10000000000L, null,
-                0, 3L);
+            0, 3L);
     }
 
-
     public void mockHeartbeatDataHigherOffset() {
         this.heartbeatManager.registerBroker("cluster1", "broker1", 
"127.0.0.1:9000", 1L, -10000L, null,
-                1, 3L);
+            1, 3L);
         this.heartbeatManager.registerBroker("cluster1", "broker1", 
"127.0.0.1:9001", 1L, 10000000000L, null,
-                1, 2L);
+            1, 2L);
         this.heartbeatManager.registerBroker("cluster1", "broker1", 
"127.0.0.1:9002", 1L, 10000000000L, null,
-                1, 3L);
+            1, 3L);
     }
 
     @Test
@@ -159,7 +159,7 @@ public class ReplicasInfoManagerTest {
         ElectPolicy electPolicy = new 
DefaultElectPolicy(this.heartbeatManager::isBrokerActive, 
this.heartbeatManager::getBrokerLiveInfo);
         mockHeartbeatDataMasterStillAlive();
         final ControllerResult<ElectMasterResponseHeader> cResult = 
this.replicasInfoManager.electMaster(request,
-                electPolicy);
+            electPolicy);
         assertEquals(ResponseCode.CONTROLLER_INVALID_REQUEST, 
cResult.getResponseCode());
     }
 
@@ -170,7 +170,7 @@ public class ReplicasInfoManagerTest {
         ElectPolicy electPolicy = new 
DefaultElectPolicy(this.heartbeatManager::isBrokerActive, 
this.heartbeatManager::getBrokerLiveInfo);
         mockHeartbeatDataHigherEpoch();
         final ControllerResult<ElectMasterResponseHeader> cResult = 
this.replicasInfoManager.electMaster(request,
-                electPolicy);
+            electPolicy);
         System.out.println(cResult.getResponseCode());
         final ElectMasterResponseHeader response = cResult.getResponse();
         System.out.println(response);
@@ -186,7 +186,7 @@ public class ReplicasInfoManagerTest {
         ElectPolicy electPolicy = new 
DefaultElectPolicy(this.heartbeatManager::isBrokerActive, 
this.heartbeatManager::getBrokerLiveInfo);
         mockHeartbeatDataHigherOffset();
         final ControllerResult<ElectMasterResponseHeader> cResult = 
this.replicasInfoManager.electMaster(request,
-                electPolicy);
+            electPolicy);
         System.out.println(cResult.getResponseCode());
         final ElectMasterResponseHeader response = cResult.getResponse();
         System.out.println(response);
@@ -195,13 +195,12 @@ public class ReplicasInfoManagerTest {
         assertEquals("127.0.0.1:9002", response.getNewMasterAddress());
     }
 
-
     @Test
     public void testElectMaster() {
         mockMetaData();
         final ElectMasterRequestHeader request = new 
ElectMasterRequestHeader("broker1");
         final ControllerResult<ElectMasterResponseHeader> cResult = 
this.replicasInfoManager.electMaster(request,
-                new DefaultElectPolicy((clusterName, brokerAddress) -> 
!brokerAddress.equals("127.0.0.1:9000"), null));
+            new DefaultElectPolicy((clusterName, brokerAddress) -> 
!brokerAddress.equals("127.0.0.1:9000"), null));
         final ElectMasterResponseHeader response = cResult.getResponse();
         assertEquals(response.getMasterEpoch(), 2);
         assertFalse(response.getNewMasterAddress().isEmpty());
@@ -213,17 +212,17 @@ public class ReplicasInfoManagerTest {
         brokerSet.add("127.0.0.1:9002");
         final ElectMasterRequestHeader assignRequest = new 
ElectMasterRequestHeader("cluster1", "broker1", "127.0.0.1:9000");
         final ControllerResult<ElectMasterResponseHeader> cResult1 = 
this.replicasInfoManager.electMaster(assignRequest,
-                new DefaultElectPolicy((clusterName, brokerAddress) -> 
brokerAddress.contains("127.0.0.1:9000"), null));
+            new DefaultElectPolicy((clusterName, brokerAddress) -> 
brokerAddress.contains("127.0.0.1:9000"), null));
         assertEquals(cResult1.getResponseCode(), 
ResponseCode.CONTROLLER_INVALID_REQUEST);
 
         final ElectMasterRequestHeader assignRequest1 = new 
ElectMasterRequestHeader("cluster1", "broker1", "127.0.0.1:9001");
         final ControllerResult<ElectMasterResponseHeader> cResult2 = 
this.replicasInfoManager.electMaster(assignRequest1,
-                new DefaultElectPolicy((clusterName, brokerAddress) -> 
brokerAddress.equals("127.0.0.1:9000"), null));
+            new DefaultElectPolicy((clusterName, brokerAddress) -> 
brokerAddress.equals("127.0.0.1:9000"), null));
         assertEquals(cResult2.getResponseCode(), 
ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE);
 
         final ElectMasterRequestHeader assignRequest2 = new 
ElectMasterRequestHeader("cluster1", "broker1", "127.0.0.1:9001");
         final ControllerResult<ElectMasterResponseHeader> cResult3 = 
this.replicasInfoManager.electMaster(assignRequest2,
-                new DefaultElectPolicy((clusterName, brokerAddress) -> 
!brokerAddress.equals("127.0.0.1:9000"), null));
+            new DefaultElectPolicy((clusterName, brokerAddress) -> 
!brokerAddress.equals("127.0.0.1:9000"), null));
         assertEquals(cResult3.getResponseCode(), ResponseCode.SUCCESS);
         final ElectMasterResponseHeader response3 = cResult3.getResponse();
         assertEquals(response3.getNewMasterAddress(), "127.0.0.1:9001");
@@ -244,7 +243,7 @@ public class ReplicasInfoManagerTest {
         // However, the syncStateSet in statemachine is {"127.0.0.1:9000"}, 
not more replicas can be elected as master, it will be failed.
         final ElectMasterRequestHeader electRequest = new 
ElectMasterRequestHeader("broker1");
         final ControllerResult<ElectMasterResponseHeader> cResult = 
this.replicasInfoManager.electMaster(electRequest,
-                new DefaultElectPolicy((clusterName, brokerAddress) -> 
!brokerAddress.equals("127.0.0.1:9000"), null));
+            new DefaultElectPolicy((clusterName, brokerAddress) -> 
!brokerAddress.equals("127.0.0.1:9000"), null));
         final List<EventMessage> events = cResult.getEvents();
         assertEquals(events.size(), 1);
         final ElectMasterEvent event = (ElectMasterEvent) events.get(0);
@@ -257,4 +256,38 @@ public class ReplicasInfoManagerTest {
         assertEquals(replicaInfo.getMasterEpoch(), 2);
     }
 
+    @Test
+    public void testCleanBrokerData() {
+        mockMetaData();
+        CleanControllerBrokerDataRequestHeader header1 = new 
CleanControllerBrokerDataRequestHeader("cluster1", "broker1", "127.0.0.1:9000");
+        ControllerResult<Void> result1 = 
this.replicasInfoManager.cleanBrokerData(header1, (cluster, brokerAddr) -> 
true);
+        assertEquals(ResponseCode.CONTROLLER_INVALID_CLEAN_BROKER_METADATA, 
result1.getResponseCode());
+
+        CleanControllerBrokerDataRequestHeader header2 = new 
CleanControllerBrokerDataRequestHeader("cluster1", "broker1", null);
+        ControllerResult<Void> result2 = 
this.replicasInfoManager.cleanBrokerData(header2, (cluster, brokerAddr) -> 
true);
+        assertEquals(ResponseCode.CONTROLLER_INVALID_CLEAN_BROKER_METADATA, 
result2.getResponseCode());
+        assertEquals("Broker broker1 is still alive, clean up failure", 
result2.getRemark());
+
+        CleanControllerBrokerDataRequestHeader header3 = new 
CleanControllerBrokerDataRequestHeader("cluster1", "broker1", "127.0.0.1:9000");
+        ControllerResult<Void> result3 = 
this.replicasInfoManager.cleanBrokerData(header3, (cluster, brokerAddr) -> 
false);
+        assertEquals(ResponseCode.SUCCESS, result3.getResponseCode());
+
+        CleanControllerBrokerDataRequestHeader header4 = new 
CleanControllerBrokerDataRequestHeader("cluster1", "broker1", 
"127.0.0.1:9000;127.0.0.1:9001;127.0.0.1:9002");
+        ControllerResult<Void> result4 = 
this.replicasInfoManager.cleanBrokerData(header4, (cluster, brokerAddr) -> 
false);
+        assertEquals(ResponseCode.SUCCESS, result4.getResponseCode());
+
+        CleanControllerBrokerDataRequestHeader header5 = new 
CleanControllerBrokerDataRequestHeader("cluster1", "broker12", 
"127.0.0.1:9000;127.0.0.1:9001;127.0.0.1:9002", true);
+        ControllerResult<Void> result5 = 
this.replicasInfoManager.cleanBrokerData(header5, (cluster, brokerAddr) -> 
false);
+        assertEquals(ResponseCode.CONTROLLER_INVALID_CLEAN_BROKER_METADATA, 
result5.getResponseCode());
+        assertEquals("Broker broker12 is not existed,clean broker data 
failure.", result5.getRemark());
+
+        CleanControllerBrokerDataRequestHeader header6 = new 
CleanControllerBrokerDataRequestHeader(null, "broker12", 
"127.0.0.1:9000;127.0.0.1:9001;127.0.0.1:9002", true);
+        ControllerResult<Void> result6 = 
this.replicasInfoManager.cleanBrokerData(header6, (cluster, brokerAddr) -> 
cluster != null);
+        assertEquals(ResponseCode.CONTROLLER_INVALID_CLEAN_BROKER_METADATA, 
result6.getResponseCode());
+
+        CleanControllerBrokerDataRequestHeader header7 = new 
CleanControllerBrokerDataRequestHeader(null, "broker1", 
"127.0.0.1:9000;127.0.0.1:9001;127.0.0.1:9002", true);
+        ControllerResult<Void> result7 = 
this.replicasInfoManager.cleanBrokerData(header7, (cluster, brokerAddr) -> 
false);
+        assertEquals(ResponseCode.SUCCESS, result7.getResponseCode());
+
+    }
 }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 0b550b56b..985544fa0 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -829,4 +829,10 @@ public class DefaultMQAdminExt extends ClientConfig 
implements MQAdminExt {
         String brokerName, String brokerAddr) throws RemotingException, 
InterruptedException, MQBrokerException {
         return this.defaultMQAdminExtImpl.electMaster(controllerAddr, 
clusterName, brokerName, brokerAddr);
     }
+
+    @Override
+    public void cleanControllerBrokerData(String controllerAddr, String 
clusterName, String brokerName,
+        String brokerAddr, boolean isCleanLivingBroker) throws 
RemotingException, InterruptedException, MQBrokerException {
+        this.defaultMQAdminExtImpl.cleanControllerBrokerData(controllerAddr, 
clusterName, brokerName, brokerAddr,isCleanLivingBroker);
+    }
 }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index acfd70115..00ca230a5 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -58,10 +58,6 @@ import org.apache.rocketmq.common.admin.RollbackStats;
 import org.apache.rocketmq.common.admin.TopicOffset;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.help.FAQUrl;
-import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
-import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
-import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
-import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.MessageClientExt;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
@@ -71,6 +67,7 @@ import org.apache.rocketmq.common.message.MessageRequestMode;
 import org.apache.rocketmq.common.namesrv.NamesrvUtil;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
+import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
@@ -82,6 +79,7 @@ import org.apache.rocketmq.common.protocol.body.HARuntimeInfo;
 import org.apache.rocketmq.common.protocol.body.InSyncStateData;
 import org.apache.rocketmq.common.protocol.body.KVTable;
 import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
 import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
 import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
@@ -89,6 +87,7 @@ import 
org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicList;
 import 
org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.UpdateGroupForbiddenRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
@@ -98,6 +97,7 @@ import 
org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
 import org.apache.rocketmq.common.subscription.GroupForbidden;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
@@ -1833,6 +1833,13 @@ public class DefaultMQAdminExtImpl implements 
MQAdminExt, MQAdminExtInner {
         
this.mqClientInstance.getMQClientAPIImpl().updateControllerConfig(properties, 
controllers, timeoutMillis);
     }
 
+    @Override
+    public void cleanControllerBrokerData(String controllerAddr, String 
clusterName, String brokerName,
+        String brokerAddr, boolean isCleanLivingBroker)
+        throws RemotingException, InterruptedException, MQBrokerException {
+        
this.mqClientInstance.getMQClientAPIImpl().cleanControllerBrokerData(controllerAddr,
 clusterName, brokerName, brokerAddr, isCleanLivingBroker);
+    }
+
     public MQClientInstance getMqClientInstance() {
         return mqClientInstance;
     }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 857f159b5..38da1e5ce 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -449,6 +449,7 @@ public interface MQAdminExt extends MQAdmin {
      * manual trigger broker elect master
      *
      * @param controllerAddr controller address
+     * @param clusterName    cluster name
      * @param brokerName     broker name
      * @param brokerAddr     broker address
      * @return
@@ -457,6 +458,13 @@ public interface MQAdminExt extends MQAdmin {
      * @throws MQBrokerException
      */
     ElectMasterResponseHeader electMaster(String controllerAddr, String 
clusterName, String brokerName,
-        String brokerAddr)
-        throws RemotingException, InterruptedException, MQBrokerException;
+        String brokerAddr) throws RemotingException, InterruptedException, 
MQBrokerException;
+
+    /**
+     * clean controller broker meta data
+     *
+     */
+    void cleanControllerBrokerData(String controllerAddr, String clusterName, 
String brokerName,
+        String brokerAddr, boolean isCleanLivingBroker) throws 
RemotingException, InterruptedException, MQBrokerException;
+
 }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java 
b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index 52aded62d..c27dce1a2 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -18,10 +18,10 @@ package org.apache.rocketmq.tools.command;
 
 import ch.qos.logback.classic.LoggerContext;
 import ch.qos.logback.classic.joran.JoranConfigurator;
-import java.util.ArrayList;
-import java.util.List;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
@@ -59,15 +59,16 @@ import 
org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand;
 import org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand;
 import org.apache.rocketmq.tools.command.container.AddBrokerSubCommand;
 import org.apache.rocketmq.tools.command.container.RemoveBrokerSubCommand;
+import 
org.apache.rocketmq.tools.command.controller.CleanControllerBrokerDataSubCommand;
 import 
org.apache.rocketmq.tools.command.controller.GetControllerConfigSubCommand;
 import 
org.apache.rocketmq.tools.command.controller.GetControllerMetaDataSubCommand;
-import 
org.apache.rocketmq.tools.command.controller.UpdateControllerConfigSubCommand;
 import org.apache.rocketmq.tools.command.controller.ReElectMasterSubCommand;
+import 
org.apache.rocketmq.tools.command.controller.UpdateControllerConfigSubCommand;
 import org.apache.rocketmq.tools.command.export.ExportConfigsCommand;
 import org.apache.rocketmq.tools.command.export.ExportMetadataCommand;
 import org.apache.rocketmq.tools.command.export.ExportMetricsCommand;
-import org.apache.rocketmq.tools.command.ha.HAStatusSubCommand;
 import org.apache.rocketmq.tools.command.ha.GetSyncStateSetSubCommand;
+import org.apache.rocketmq.tools.command.ha.HAStatusSubCommand;
 import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand;
 import org.apache.rocketmq.tools.command.message.ConsumeMessageCommand;
 import org.apache.rocketmq.tools.command.message.PrintMessageByQueueCommand;
@@ -265,6 +266,7 @@ public class MQAdminStartup {
         initCommand(new GetControllerConfigSubCommand());
         initCommand(new UpdateControllerConfigSubCommand());
         initCommand(new ReElectMasterSubCommand());
+        initCommand(new CleanControllerBrokerDataSubCommand());
     }
 
     private static void initLogback() throws Exception {
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/CleanControllerBrokerDataSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/CleanControllerBrokerDataSubCommand.java
new file mode 100644
index 000000000..4b354ac7f
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/CleanControllerBrokerDataSubCommand.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.controller;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class CleanControllerBrokerDataSubCommand implements SubCommand {
+
+    @Override
+    public String commandName() {
+        return "cleanBrokerData";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "Clean data of broker on controller";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+
+        Option opt = new Option("a", "controllerAddress", true, "The address 
of controller");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("b", "brokerAddress", true, "The address of the 
broker which requires to clean metadata. eg: 
192.168.0.1:30911;192.168.0.2:30911");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("n", "brokerName", true, "The broker name of the 
replicas that require to be manipulated");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("c", "clusterName", true, "the clusterName of 
broker");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("l", "cleanLivingBroker", false, " whether clean up 
living brokers,default value is false");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook 
rpcHook) throws SubCommandException {
+
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        String controllerAddress = commandLine.getOptionValue('a').trim();
+        String brokerName = commandLine.getOptionValue('n').trim();
+        String clusterName = null;
+        String brokerAddress = null;
+
+        if (commandLine.hasOption('c')) {
+            clusterName = commandLine.getOptionValue('c').trim();
+        }
+        if (commandLine.hasOption('b')) {
+            brokerAddress = commandLine.getOptionValue('b').trim();
+        }
+        boolean isCleanLivingBroker = false;
+        if (commandLine.hasOption('l')) {
+            isCleanLivingBroker = true;
+        }
+
+        if (!isCleanLivingBroker && StringUtils.isEmpty(clusterName)) {
+            throw new IllegalArgumentException("cleanLivingBroker option is 
false,clusterName option can not be empty.");
+        }
+
+        try {
+            defaultMQAdminExt.start();
+            defaultMQAdminExt.cleanControllerBrokerData(controllerAddress, 
clusterName, brokerName, brokerAddress, isCleanLivingBroker);
+            System.out.printf("clear broker %s data from controller success! 
\n", brokerName);
+        } catch (Exception e) {
+            throw new SubCommandException(this.getClass().getSimpleName() + " 
command failed", e);
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}

Reply via email to