This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 32096c9f04 [IoTDB-3052] ConfigNode shrinking process (#6329)
32096c9f04 is described below
commit 32096c9f042d3402ac6f2c30b3810c2c13d64580
Author: leety <[email protected]>
AuthorDate: Wed Jun 22 12:19:07 2022 +0800
[IoTDB-3052] ConfigNode shrinking process (#6329)
---
.../assembly/resources/sbin/remove-confignode.sh | 79 ++++++++++++++++
.../client/SyncConfigNodeClientPool.java | 65 +++++++++++++
.../confignode/conf/ConfigNodeRemoveCheck.java | 102 +++++++++++++++++++++
.../consensus/request/ConfigRequest.java | 4 +
.../consensus/request/ConfigRequestType.java | 1 +
.../request/write/RemoveConfigNodeReq.java | 77 ++++++++++++++++
.../iotdb/confignode/manager/ConfigManager.java | 14 +++
.../iotdb/confignode/manager/ConsensusManager.java | 33 +++++++
.../apache/iotdb/confignode/manager/Manager.java | 8 ++
.../iotdb/confignode/manager/NodeManager.java | 74 +++++++++++++++
.../iotdb/confignode/persistence/NodeInfo.java | 29 ++++++
.../executor/ConfigRequestExecutor.java | 3 +
.../iotdb/confignode/service/ConfigNode.java | 45 ++++++++-
.../confignode/service/ConfigNodeCommandLine.java | 8 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 61 ++++++++++++
.../consensus/request/ConfigRequestSerDeTest.java | 12 +++
.../apache/iotdb/db/client/ConfigNodeClient.java | 32 +++++++
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 6 +-
.../src/main/thrift/confignode.thrift | 4 +
19 files changed, 650 insertions(+), 7 deletions(-)
diff --git a/confignode/src/assembly/resources/sbin/remove-confignode.sh
b/confignode/src/assembly/resources/sbin/remove-confignode.sh
new file mode 100644
index 0000000000..eace0d2912
--- /dev/null
+++ b/confignode/src/assembly/resources/sbin/remove-confignode.sh
@@ -0,0 +1,79 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+echo ----------------------------
+echo Starting to remove IoTDB ConfigNode
+echo ----------------------------
+
+if [ -z "${CONFIGNODE_HOME}" ]; then
+ export CONFIGNODE_HOME="`dirname "$0"`/.."
+fi
+
+CONFIGNODE_CONF=${CONFIGNODE_HOME}/conf
+CONFIGNODE_LOGS=${CONFIGNODE_HOME}/logs
+
+is_conf_path=false
+for arg do
+ shift
+ if [ "$arg" == "-c" ]; then
+ is_conf_path=true
+ continue
+ fi
+ if [ $is_conf_path == true ]; then
+ CONFIGNODE_CONF=$arg
+ is_conf_path=false
+ continue
+ fi
+ set -- "$@" "$arg"
+done
+
+CONF_PARAMS="-r "$*
+
+if [ -f "$CONFIGNODE_CONF/confignode-env.sh" ]; then
+ if [ "$#" -ge "1" -a "$1" == "printgc" ]; then
+ . "$CONFIGNODE_CONF/confignode-env.sh" "printgc"
+ else
+ . "$CONFIGNODE_CONF/confignode-env.sh"
+ fi
+else
+ echo "can't find $CONFIGNODE_CONF/confignode-env.sh"
+fi
+
+CLASSPATH=""
+for f in ${CONFIGNODE_HOME}/lib/*.jar; do
+ CLASSPATH=${CLASSPATH}":"$f
+done
+classname=org.apache.iotdb.confignode.service.ConfigNode
+
+launch_service()
+{
+ class="$1"
+
confignode_parms="-Dlogback.configurationFile=${CONFIGNODE_CONF}/logback.xml"
+ confignode_parms="$confignode_parms
-DCONFIGNODE_HOME=${CONFIGNODE_HOME}"
+ confignode_parms="$confignode_parms
-DCONFIGNODE_CONF=${CONFIGNODE_CONF}"
+ exec "$JAVA" $illegal_access_params $confignode_parms
$CONFIGNODE_JMX_OPTS -cp "$CLASSPATH" "$class" $CONF_PARAMS
+ return $?
+}
+
+# Start up the service
+launch_service "$classname"
+
+exit $?
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java
index 484687eacd..805f9d4a4e 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
import java.util.concurrent.TimeUnit;
/** Synchronously send RPC requests to ConfigNode. See confignode.thrift for
more details. */
@@ -42,11 +43,22 @@ public class SyncConfigNodeClientPool {
private final IClientManager<TEndPoint, SyncConfigNodeIServiceClient>
clientManager;
+ private TEndPoint configNodeLeader;
+
private SyncConfigNodeClientPool() {
clientManager =
new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>()
.createClientManager(
new
DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
+ configNodeLeader = new TEndPoint();
+ }
+
+ private void updateConfigNodeLeader(TSStatus status) {
+ if (status.isSetRedirectNode()) {
+ configNodeLeader = status.getRedirectNode();
+ } else {
+ configNodeLeader = null;
+ }
}
/** Only use registerConfigNode when the ConfigNode is first startup. */
@@ -83,6 +95,59 @@ public class SyncConfigNodeClientPool {
.setMessage("All retry failed.");
}
+ /**
+ * ConfigNode Leader stop any ConfigNode in the cluster
+ *
+ * @param configNodeLocations confignode_list of confignode-system.properties
+ * @param configNodeLocation To be removed ConfigNode
+ * @return SUCCESS_STATUS: remove ConfigNode success, other status remove
failed
+ */
+ public TSStatus removeConfigNode(
+ List<TConfigNodeLocation> configNodeLocations, TConfigNodeLocation
configNodeLocation) {
+ // TODO: Unified retry logic
+ for (TConfigNodeLocation nodeLocation : configNodeLocations) {
+ for (int retry = 0; retry < retryNum; retry++) {
+ try (SyncConfigNodeIServiceClient client =
+ clientManager.borrowClient(nodeLocation.getInternalEndPoint())) {
+ TSStatus status = client.removeConfigNode(configNodeLocation);
+ while (status.getCode() ==
TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
+ TimeUnit.MILLISECONDS.sleep(2000);
+ updateConfigNodeLeader(status);
+ try (SyncConfigNodeIServiceClient clientLeader =
+ clientManager.borrowClient(configNodeLeader)) {
+ status = clientLeader.removeConfigNode(configNodeLocation);
+ }
+ }
+ return status;
+ } catch (Exception e) {
+ LOGGER.warn("Remove ConfigNode failed, retrying...", e);
+ doRetryWait();
+ }
+ }
+ }
+
+ LOGGER.error("Remove ConfigNode failed");
+ return new TSStatus(TSStatusCode.ALL_RETRY_FAILED.getStatusCode())
+ .setMessage("All retry failed.");
+ }
+
+ /** Only use stopConfigNode when the ConfigNode is removed. */
+ public TSStatus stopConfigNode(TConfigNodeLocation configNodeLocation) {
+ // TODO: Unified retry logic
+ for (int retry = 0; retry < retryNum; retry++) {
+ try (SyncConfigNodeIServiceClient client =
+
clientManager.borrowClient(configNodeLocation.getInternalEndPoint())) {
+ return client.stopConfigNode(configNodeLocation);
+ } catch (Exception e) {
+ LOGGER.warn("Stop ConfigNode failed, retrying...", e);
+ doRetryWait();
+ }
+ }
+ LOGGER.error("Stop ConfigNode failed");
+ return new TSStatus(TSStatusCode.ALL_RETRY_FAILED.getStatusCode())
+ .setMessage("All retry failed.");
+ }
+
private void doRetryWait() {
try {
TimeUnit.MILLISECONDS.sleep(100);
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java
new file mode 100644
index 0000000000..169e1509f5
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.conf;
+
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.BadNodeUrlException;
+import org.apache.iotdb.commons.utils.NodeUrlUtils;
+import org.apache.iotdb.confignode.client.SyncConfigNodeClientPool;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+public class ConfigNodeRemoveCheck {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ConfigNodeStartupCheck.class);
+
+ private static final ConfigNodeConfig conf =
ConfigNodeDescriptor.getInstance().getConf();
+
+ private final File systemPropertiesFile;
+ private final Properties systemProperties;
+
+ public ConfigNodeRemoveCheck() {
+ systemPropertiesFile =
+ new File(conf.getSystemDir() + File.separator +
ConfigNodeConstant.SYSTEM_FILE_NAME);
+ systemProperties = new Properties();
+ }
+
+ public TConfigNodeLocation removeCheck(TEndPoint endPoint) {
+ TConfigNodeLocation nodeLocation = new TConfigNodeLocation();
+ if (!systemPropertiesFile.exists()) {
+ LOGGER.error("The system properties file is not exists. IoTDB-ConfigNode
is shutdown.");
+ return nodeLocation;
+ }
+ try (FileInputStream inputStream = new
FileInputStream(systemPropertiesFile)) {
+ systemProperties.load(inputStream);
+ nodeLocation =
+ getConfigNodeList().stream()
+ .filter(e -> e.getInternalEndPoint().equals(endPoint))
+ .findFirst()
+ .get();
+ } catch (IOException | BadNodeUrlException e) {
+ LOGGER.error("Load system properties file failed.", e);
+ }
+
+ return nodeLocation;
+ }
+
+ public void removeConfigNode(TConfigNodeLocation nodeLocation)
+ throws BadNodeUrlException, IOException {
+ TSStatus status =
+
SyncConfigNodeClientPool.getInstance().removeConfigNode(getConfigNodeList(),
nodeLocation);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.error(status.getMessage());
+ throw new IOException("Remove ConfigNode failed:");
+ }
+ }
+
+ public List<TConfigNodeLocation> getConfigNodeList() throws
BadNodeUrlException {
+ return
NodeUrlUtils.parseTConfigNodeUrls(systemProperties.getProperty("confignode_list"));
+ }
+
+ public int getConsensusPort() {
+ return Integer.parseInt(systemProperties.getProperty("consensus_port"));
+ }
+
+ private static class ConfigNodeConfRemoveCheckHolder {
+
+ private static final ConfigNodeRemoveCheck INSTANCE = new
ConfigNodeRemoveCheck();
+
+ private ConfigNodeConfRemoveCheckHolder() {
+ // Empty constructor
+ }
+ }
+
+ public static ConfigNodeRemoveCheck getInstance() {
+ return ConfigNodeRemoveCheck.ConfigNodeConfRemoveCheckHolder.INSTANCE;
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
index c2f651792b..02cb72d761 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
@@ -39,6 +39,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupReq
import org.apache.iotdb.confignode.consensus.request.write.DropFunctionReq;
import
org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroupReq;
import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
+import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodeReq;
import
org.apache.iotdb.confignode.consensus.request.write.SetDataReplicationFactorReq;
import
org.apache.iotdb.confignode.consensus.request.write.SetSchemaReplicationFactorReq;
import org.apache.iotdb.confignode.consensus.request.write.SetStorageGroupReq;
@@ -181,6 +182,9 @@ public abstract class ConfigRequest implements
IConsensusRequest {
case ApplyConfigNode:
req = new ApplyConfigNodeReq();
break;
+ case RemoveConfigNode:
+ req = new RemoveConfigNodeReq();
+ break;
case CreateFunction:
req = new CreateFunctionReq();
break;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
index 6d61a0e0b2..9deadb7165 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
@@ -59,6 +59,7 @@ public enum ConfigRequestType {
ListUserRoles,
ListRoleUsers,
ApplyConfigNode,
+ RemoveConfigNode,
CreateFunction,
DropFunction,
GetNodePathsPartition,
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/RemoveConfigNodeReq.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/RemoveConfigNodeReq.java
new file mode 100644
index 0000000000..dc5f5c6058
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/RemoveConfigNodeReq.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.consensus.request.write;
+
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.commons.utils.ThriftConfigNodeSerDeUtils;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class RemoveConfigNodeReq extends ConfigRequest {
+
+ private TConfigNodeLocation configNodeLocation;
+
+ public RemoveConfigNodeReq() {
+ super(ConfigRequestType.RemoveConfigNode);
+ }
+
+ public RemoveConfigNodeReq(TConfigNodeLocation configNodeLocation) {
+ this();
+ this.configNodeLocation = configNodeLocation;
+ }
+
+ public TConfigNodeLocation getConfigNodeLocation() {
+ return configNodeLocation;
+ }
+
+ @Override
+ protected void serializeImpl(DataOutputStream stream) throws IOException {
+ ReadWriteIOUtils.write(ConfigRequestType.RemoveConfigNode.ordinal(),
stream);
+
+
ThriftConfigNodeSerDeUtils.serializeTConfigNodeLocation(configNodeLocation,
stream);
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ configNodeLocation =
ThriftConfigNodeSerDeUtils.deserializeTConfigNodeLocation(buffer);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(configNodeLocation);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RemoveConfigNodeReq that = (RemoveConfigNodeReq) o;
+ return configNodeLocation.equals(that.configNodeLocation);
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index b2ed3a2881..782352587e 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -42,6 +42,7 @@ import
org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionReq;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupReq;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodeReq;
import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
+import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodeReq;
import
org.apache.iotdb.confignode.consensus.request.write.SetDataReplicationFactorReq;
import
org.apache.iotdb.confignode.consensus.request.write.SetSchemaReplicationFactorReq;
import org.apache.iotdb.confignode.consensus.request.write.SetStorageGroupReq;
@@ -73,6 +74,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionResp;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -473,7 +475,9 @@ public class ConfigManager implements Manager {
if (getConsensusManager().isLeader()) {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} else {
+ Peer leader =
consensusManager.getLeader(nodeManager.getOnlineConfigNodes());
return new TSStatus(TSStatusCode.NEED_REDIRECTION.getStatusCode())
+ .setRedirectNode(leader.getEndpoint())
.setMessage(
"The current ConfigNode is not leader. And ConfigNodeGroup is in
leader election. Please redirect with a random ConfigNode.");
}
@@ -625,6 +629,16 @@ public class ConfigManager implements Manager {
return nodeManager.applyConfigNode(applyConfigNodeReq);
}
+ @Override
+ public TSStatus removeConfigNode(RemoveConfigNodeReq removeConfigNodeReq) {
+ TSStatus status = confirmLeader();
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return nodeManager.removeConfigNode(removeConfigNodeReq);
+ } else {
+ return status;
+ }
+ }
+
@Override
public TSStatus createFunction(String udfName, String className,
List<String> uris) {
TSStatus status = confirmLeader();
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index 95c82d59d4..4ce8bdc813 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodeReq;
+import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodeReq;
import
org.apache.iotdb.confignode.consensus.statemachine.PartitionRegionStateMachine;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.IConsensus;
@@ -144,6 +145,23 @@ public class ConsensusManager {
.isSuccess();
}
+ /**
+ * Remove a ConfigNode Peer out of PartitionRegion
+ *
+ * @param removeConfigNodeReq RemoveConfigNodeReq
+ * @return True if successfully removePeer. False if another ConfigNode is
being removed to the
+ * PartitionRegion
+ */
+ public boolean removeConfigNodePeer(RemoveConfigNodeReq removeConfigNodeReq)
{
+ return consensusImpl
+ .removePeer(
+ consensusGroupId,
+ new Peer(
+ consensusGroupId,
+
removeConfigNodeReq.getConfigNodeLocation().getConsensusEndPoint()))
+ .isSuccess();
+ }
+
/** Transmit PhysicalPlan to confignode.consensus.statemachine */
public ConsensusWriteResponse write(ConfigRequest req) {
return consensusImpl.write(consensusGroupId, req);
@@ -158,7 +176,22 @@ public class ConsensusManager {
return consensusImpl.isLeader(consensusGroupId);
}
+ public Peer getLeader(List<TConfigNodeLocation> onlineConfigNodes) {
+ Peer leader = consensusImpl.getLeader(consensusGroupId);
+
+ TConfigNodeLocation nodeLocation =
+ onlineConfigNodes.stream()
+ .filter(e -> e.getConsensusEndPoint().equals(leader.getEndpoint()))
+ .findFirst()
+ .get();
+ return new Peer(consensusGroupId, nodeLocation.getInternalEndPoint());
+ }
+
public ConsensusGroupId getConsensusGroupId() {
return consensusGroupId;
}
+
+ public IConsensus getConsensusImpl() {
+ return consensusImpl;
+ }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
index 80e7980ae7..668aac3cdc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.confignode.consensus.request.read.GetRegionLocationsReq;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupReq;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodeReq;
import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
+import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodeReq;
import
org.apache.iotdb.confignode.consensus.request.write.SetDataReplicationFactorReq;
import
org.apache.iotdb.confignode.consensus.request.write.SetSchemaReplicationFactorReq;
import org.apache.iotdb.confignode.consensus.request.write.SetStorageGroupReq;
@@ -223,6 +224,13 @@ public interface Manager {
*/
TSStatus applyConfigNode(ApplyConfigNodeReq applyConfigNodeReq);
+ /**
+ * Remove ConfigNode
+ *
+ * @return status
+ */
+ TSStatus removeConfigNode(RemoveConfigNodeReq removeConfigNodeReq);
+
TSStatus createFunction(String udfName, String className, List<String> uris);
TSStatus dropFunction(String udfName);
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index c4a0471522..bb85ebfd6a 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -23,12 +23,14 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.client.handlers.FlushHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeInfoReq;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodeReq;
import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
+import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodeReq;
import
org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationResp;
import org.apache.iotdb.confignode.consensus.response.DataNodeInfosResp;
import org.apache.iotdb.confignode.persistence.NodeInfo;
@@ -36,6 +38,8 @@ import
org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -47,6 +51,7 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.ReentrantLock;
/** NodeManager manages cluster node addition and removal requests */
public class NodeManager {
@@ -56,12 +61,15 @@ public class NodeManager {
private final Manager configManager;
private final NodeInfo nodeInfo;
+ private final ReentrantLock removeConfigNodeLock;
+
/** TODO:do some operate after add node or remove node */
private final List<ChangeServerListener> listeners = new
CopyOnWriteArrayList<>();
public NodeManager(Manager configManager, NodeInfo nodeInfo) {
this.configManager = configManager;
this.nodeInfo = nodeInfo;
+ this.removeConfigNodeLock = new ReentrantLock();
}
private void setGlobalConfig(DataNodeConfigurationResp dataSet) {
@@ -188,6 +196,72 @@ public class NodeManager {
nodeInfo.addMetrics();
}
+ public TSStatus removeConfigNode(RemoveConfigNodeReq removeConfigNodeReq) {
+ if (removeConfigNodeLock.tryLock()) {
+ try {
+ // Check ConfigNodes number
+ if (getOnlineConfigNodes().size() <= 1) {
+ return new
TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode())
+ .setMessage(
+ "Remove ConfigNode failed because there is only one
ConfigNode in current Cluster.");
+ }
+
+ // Check whether the onlineConfigNodes contain the ConfigNode to be
removed.
+ if
(!getOnlineConfigNodes().contains(removeConfigNodeReq.getConfigNodeLocation()))
{
+ return new
TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode())
+ .setMessage(
+ "Remove ConfigNode failed because the ConfigNode not in
current Cluster.");
+ }
+
+ // Check whether the remove ConfigNode is leader
+ Peer leader = getConsensusManager().getLeader(getOnlineConfigNodes());
+ if (leader
+ .getEndpoint()
+
.equals(removeConfigNodeReq.getConfigNodeLocation().getInternalEndPoint())) {
+ // transfer leader
+ return transferLeader(removeConfigNodeReq,
getConsensusManager().getConsensusGroupId());
+ }
+
+ // Execute removePeer
+ if (getConsensusManager().removeConfigNodePeer(removeConfigNodeReq)) {
+ return getConsensusManager().write(removeConfigNodeReq).getStatus();
+ } else {
+ return new
TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode())
+ .setMessage(
+ "Remove ConfigNode failed because update ConsensusGroup peer
information failed.");
+ }
+ } finally {
+ removeConfigNodeLock.unlock();
+ }
+ } else {
+ return new
TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode())
+ .setMessage("A ConfigNode is removing. Please wait or try again.");
+ }
+ }
+
+ private TSStatus transferLeader(
+ RemoveConfigNodeReq removeConfigNodeReq, ConsensusGroupId groupId) {
+ TConfigNodeLocation newLeader =
+ getOnlineConfigNodes().stream()
+ .filter(e ->
!e.equals(removeConfigNodeReq.getConfigNodeLocation()))
+ .findAny()
+ .get();
+ ConsensusGenericResponse resp =
+ getConsensusManager()
+ .getConsensusImpl()
+ .transferLeader(groupId, new Peer(groupId,
newLeader.getConsensusEndPoint()));
+ if (!resp.isSuccess()) {
+ return new
TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode())
+ .setMessage("Remove ConfigNode failed because transfer ConfigNode
leader failed.");
+ }
+ return new TSStatus(TSStatusCode.NEED_REDIRECTION.getStatusCode())
+ .setRedirectNode(newLeader.getInternalEndPoint())
+ .setMessage(
+ "The ConfigNode to be removed is leader, already transfer Leader
to "
+ + newLeader
+ + ".");
+ }
+
public List<TConfigNodeLocation> getOnlineConfigNodes() {
return nodeInfo.getOnlineConfigNodes();
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
index 583a45ad43..2ee1820d23 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeInfoReq;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodeReq;
import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
+import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodeReq;
import org.apache.iotdb.confignode.consensus.response.DataNodeInfosResp;
import org.apache.iotdb.db.service.metrics.MetricsService;
import org.apache.iotdb.db.service.metrics.enums.Metric;
@@ -302,6 +303,34 @@ public class NodeInfo implements SnapshotProcessor {
return status;
}
+ /**
+ * Update ConfigNodeList both in memory and confignode-system.properties file
+ *
+ * @param removeConfigNodeReq RemoveConfigNodeReq
+ * @return REMOVE_CONFIGNODE_FAILED if remove online ConfigNode failed.
+ */
+ public TSStatus removeConfigNodeList(RemoveConfigNodeReq
removeConfigNodeReq) {
+ TSStatus status = new TSStatus();
+ configNodeInfoReadWriteLock.writeLock().lock();
+ try {
+ onlineConfigNodes.remove(removeConfigNodeReq.getConfigNodeLocation());
+ storeConfigNode();
+ LOGGER.info(
+ "Successfully remove ConfigNode: {}. Current ConfigNodeGroup: {}",
+ removeConfigNodeReq.getConfigNodeLocation(),
+ onlineConfigNodes);
+ status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } catch (IOException e) {
+ LOGGER.error("Remove online ConfigNode failed.", e);
+ status.setCode(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode());
+ status.setMessage(
+ "Remove ConfigNode failed because current ConfigNode can't store
ConfigNode information.");
+ } finally {
+ configNodeInfoReadWriteLock.writeLock().unlock();
+ }
+ return status;
+ }
+
private void storeConfigNode() throws IOException {
Properties systemProperties = new Properties();
try (FileInputStream inputStream = new
FileInputStream(systemPropertiesFile)) {
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
index e524497b46..e861e010d4 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
@@ -41,6 +41,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupReq
import org.apache.iotdb.confignode.consensus.request.write.DropFunctionReq;
import
org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroupReq;
import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
+import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodeReq;
import
org.apache.iotdb.confignode.consensus.request.write.SetDataReplicationFactorReq;
import
org.apache.iotdb.confignode.consensus.request.write.SetSchemaReplicationFactorReq;
import org.apache.iotdb.confignode.consensus.request.write.SetStorageGroupReq;
@@ -186,6 +187,8 @@ public class ConfigRequestExecutor {
return authorInfo.authorNonQuery((AuthorReq) req);
case ApplyConfigNode:
return nodeInfo.updateConfigNodeList((ApplyConfigNodeReq) req);
+ case RemoveConfigNode:
+ return nodeInfo.removeConfigNodeList((RemoveConfigNodeReq) req);
case CreateFunction:
return udfInfo.createFunction((CreateFunctionReq) req);
case DropFunction:
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index bcf5f20cfd..a2544eb558 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -18,15 +18,20 @@
*/
package org.apache.iotdb.confignode.service;
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.exception.BadNodeUrlException;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.JMXService;
import org.apache.iotdb.commons.service.RegisterManager;
import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
+import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.conf.ConfigNodeRemoveCheck;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCService;
import
org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCServiceProcessor;
@@ -47,15 +52,19 @@ public class ConfigNode implements ConfigNodeMBean {
private final RegisterManager registerManager = new RegisterManager();
- private final ConfigNodeRPCService configNodeRPCService;
- private final ConfigNodeRPCServiceProcessor configNodeRPCServiceProcessor;
+ private ConfigNodeRPCService configNodeRPCService;
+ private ConfigNodeRPCServiceProcessor configNodeRPCServiceProcessor;
private ConfigManager configManager;
private ConfigNode() {
+ // we do not init anything here, so that we can re-initialize the instance
in IT.
+ }
+
+ private void initConfigManager() {
// Init ConfigManager
try {
- this.configManager = new ConfigManager();
+ configManager = new ConfigManager();
} catch (IOException e) {
LOGGER.error("Can't start ConfigNode consensus group!", e);
try {
@@ -67,8 +76,8 @@ public class ConfigNode implements ConfigNodeMBean {
}
// Init RPC service
- this.configNodeRPCService = new ConfigNodeRPCService();
- this.configNodeRPCServiceProcessor = new
ConfigNodeRPCServiceProcessor(configManager);
+ configNodeRPCService = new ConfigNodeRPCService();
+ configNodeRPCServiceProcessor = new
ConfigNodeRPCServiceProcessor(configManager);
}
public static void main(String[] args) {
@@ -78,6 +87,9 @@ public class ConfigNode implements ConfigNodeMBean {
/** Register services */
private void setUp() throws StartupException, IOException {
LOGGER.info("Setting up {}...", ConfigNodeConstant.GLOBAL_NAME);
+ // Init ConfigManager
+ initConfigManager();
+
registerManager.register(new JMXService());
JMXService.registerMBean(this, mbeanName);
@@ -133,6 +145,29 @@ public class ConfigNode implements ConfigNodeMBean {
deactivate();
}
+ public void doRemoveNode(String[] args) throws IOException {
+ LOGGER.info("Starting to remove {}...", ConfigNodeConstant.GLOBAL_NAME);
+ if (args.length != 3) {
+ LOGGER.info("Usage: -r <ip>:<rpcPort>");
+ return;
+ }
+
+ try {
+ TEndPoint endPoint = NodeUrlUtils.parseTEndPointUrl(args[2]);
+ TConfigNodeLocation removeConfigNodeLocation =
+ ConfigNodeRemoveCheck.getInstance().removeCheck(endPoint);
+ if (removeConfigNodeLocation == null) {
+ LOGGER.error("The ConfigNode not in the Cluster.");
+ return;
+ }
+
+
ConfigNodeRemoveCheck.getInstance().removeConfigNode(removeConfigNodeLocation);
+ } catch (BadNodeUrlException e) {
+ LOGGER.warn("No ConfigNodes need to be removed.", e);
+ }
+ LOGGER.info("{} is removed.", ConfigNodeConstant.GLOBAL_NAME);
+ }
+
private static class ConfigNodeHolder {
private static final ConfigNode INSTANCE = new ConfigNode();
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java
index c66e861274..d3e1a63b9b 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java
@@ -73,7 +73,13 @@ public class ConfigNodeCommandLine extends ServerCommandLine
{
}
ConfigNode.getInstance().active();
} else if (MODE_REMOVE.equals(mode)) {
- // TODO: remove node
+ // remove node
+ try {
+ ConfigNode.getInstance().doRemoveNode(args);
+ } catch (IOException e) {
+ LOGGER.error("Meet error when doing remove", e);
+ return -1;
+ }
}
return 0;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 6de969d879..e331534e5b 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -25,8 +25,10 @@ import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.client.SyncConfigNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
import org.apache.iotdb.confignode.consensus.request.auth.AuthorReq;
@@ -38,6 +40,7 @@ import
org.apache.iotdb.confignode.consensus.request.read.GetRegionLocationsReq;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupReq;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodeReq;
import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
+import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodeReq;
import
org.apache.iotdb.confignode.consensus.request.write.SetDataReplicationFactorReq;
import
org.apache.iotdb.confignode.consensus.request.write.SetSchemaReplicationFactorReq;
import org.apache.iotdb.confignode.consensus.request.write.SetStorageGroupReq;
@@ -84,6 +87,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
+import org.apache.iotdb.confignode.service.ConfigNode;
+import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
import org.apache.iotdb.rpc.RpcUtils;
@@ -382,6 +387,62 @@ public class ConfigNodeRPCServiceProcessor implements
ConfigIService.Iface {
return status;
}
+ /**
+ * For leader to remove ConfigNode configuration in consensus layer
+ *
+ * @param configNodeLocation
+ * @return
+ */
+ @Override
+ public TSStatus removeConfigNode(TConfigNodeLocation configNodeLocation)
throws TException {
+ RemoveConfigNodeReq removeConfigNodeReq = new
RemoveConfigNodeReq(configNodeLocation);
+
+ TSStatus status = configManager.removeConfigNode(removeConfigNodeReq);
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ status =
SyncConfigNodeClientPool.getInstance().stopConfigNode(configNodeLocation);
+ }
+
+ // Print log to record the ConfigNode that performs the
RemoveConfigNodeRequest
+ LOGGER.info("Execute RemoveConfigNodeRequest {} with result {}",
configNodeLocation, status);
+
+ return status;
+ }
+
+ /**
+ * For leader to stop ConfigNode
+ *
+ * @param configNodeLocation
+ * @return
+ */
+ @Override
+ public TSStatus stopConfigNode(TConfigNodeLocation configNodeLocation)
throws TException {
+ if
(!configManager.getNodeManager().getOnlineConfigNodes().contains(configNodeLocation))
{
+ return new
TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode())
+ .setMessage("Stop ConfigNode failed because the ConfigNode not in
current Cluster.");
+ }
+
+ ConsensusGroupId groupId =
configManager.getConsensusManager().getConsensusGroupId();
+ ConsensusGenericResponse resp =
+
configManager.getConsensusManager().getConsensusImpl().removeConsensusGroup(groupId);
+ if (!resp.isSuccess()) {
+ return new
TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode())
+ .setMessage("Stop ConfigNode failed because remove ConsensusGroup
failed.");
+ }
+
+ new Thread(
+ () -> {
+ try {
+ ConfigNode.getInstance().stop();
+ System.exit(0);
+ } catch (IOException e) {
+ LOGGER.error("Meet error when stop ConfigNode!", e);
+ }
+ })
+ .start();
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())
+ .setMessage("Stop ConfigNode success.");
+ }
+
@Override
public TSStatus createFunction(TCreateFunctionReq req) {
return configManager.createFunction(req.getUdfName(), req.getClassName(),
req.getUris());
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
index 61a7381027..1bea05b173 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
@@ -49,6 +49,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.DeleteProcedureReq;
import org.apache.iotdb.confignode.consensus.request.write.DeleteRegionsReq;
import
org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupReq;
import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
+import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodeReq;
import
org.apache.iotdb.confignode.consensus.request.write.SetDataReplicationFactorReq;
import
org.apache.iotdb.confignode.consensus.request.write.SetSchemaReplicationFactorReq;
import org.apache.iotdb.confignode.consensus.request.write.SetStorageGroupReq;
@@ -482,6 +483,17 @@ public class ConfigRequestSerDeTest {
Assert.assertEquals(req0, req1);
}
+ @Test
+ public void removeConfigNodeReqTest() throws IOException {
+ RemoveConfigNodeReq req0 =
+ new RemoveConfigNodeReq(
+ new TConfigNodeLocation(
+ 0, new TEndPoint("0.0.0.0", 22277), new TEndPoint("0.0.0.0",
22278)));
+ RemoveConfigNodeReq req1 =
+ (RemoveConfigNodeReq)
ConfigRequest.Factory.create(req0.serializeToByteBuffer());
+ Assert.assertEquals(req0, req1);
+ }
+
@Test
public void updateProcedureTest() throws IOException {
DeleteStorageGroupProcedure procedure = new DeleteStorageGroupProcedure();
diff --git
a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 1dfaf9bb4e..b90107c068 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -637,6 +637,38 @@ public class ConfigNodeClient implements
ConfigIService.Iface, SyncThriftClient,
throw new TException(MSG_RECONNECTION_FAIL);
}
+ @Override
+ public TSStatus removeConfigNode(TConfigNodeLocation configNodeLocation)
throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TSStatus status = client.removeConfigNode(configNodeLocation);
+ if (!updateConfigNodeLeader(status)) {
+ return status;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
+ @Override
+ public TSStatus stopConfigNode(TConfigNodeLocation configNodeLocation)
throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TSStatus status = client.stopConfigNode(configNodeLocation);
+ if (!updateConfigNodeLeader(status)) {
+ return status;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
@Override
public TSStatus createFunction(TCreateFunctionReq req) throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index f0bd4f94d7..47d6625146 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -132,7 +132,11 @@ public enum TSStatusCode {
STORAGE_GROUP_ALREADY_EXISTS(903),
NOT_ENOUGH_DATA_NODE(904),
ERROR_GLOBAL_CONFIG(905),
- APPLY_CONFIGNODE_FAILED(906);
+ APPLY_CONFIGNODE_FAILED(906),
+ REGISTER_CONFIGNODE_FAILED(907),
+ REMOVE_CONFIGNODE_FAILED(908),
+ REMOVE_CONFIGNODE_DUPLICATION(909),
+ STOP_CONOFIGNODE_FAILED(910);
private int statusCode;
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift
b/thrift-confignode/src/main/thrift/confignode.thrift
index 9f4eb968c1..76fffef2e5 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -304,6 +304,10 @@ service ConfigIService {
common.TSStatus applyConfigNode(common.TConfigNodeLocation
configNodeLocation)
+ common.TSStatus removeConfigNode(common.TConfigNodeLocation
configNodeLocation)
+
+ common.TSStatus stopConfigNode(common.TConfigNodeLocation configNodeLocation)
+
/* UDF */
common.TSStatus createFunction(TCreateFunctionReq req)