This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch mergemaster0808
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bc94e5aee534b1910396f422ff6df5b4dd6a3b74
Author: Christofer Dutz <[email protected]>
AuthorDate: Tue Jul 30 07:17:48 2024 +0200

    Enhance remove-datanode function
    
    (cherry picked from commit 04ba236ef64a370bfefb762182a9c0b1363f8fee)
---
 .../main/java/org/apache/iotdb/rpc/UrlUtils.java   |   9 +-
 .../thrift/ConfigNodeRPCServiceHandler.java        |   8 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  48 +++--
 .../thrift/ConfigNodeRPCServiceProcessorTest.java  | 164 ++++++++++++++++
 .../db/service/DataNodeServerCommandLine.java      | 145 ++++++++------
 .../db/service/DataNodeServerCommandLineTest.java  | 218 +++++++++++++++++++++
 6 files changed, 512 insertions(+), 80 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/UrlUtils.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/UrlUtils.java
index a7994a8a520..39b2390a736 100644
--- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/UrlUtils.java
+++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/UrlUtils.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 
 /** The UrlUtils */
 public class UrlUtils {
-  private static final String POINT_COLON = ":";
+  private static final String PORT_SEPARATOR = ":";
   private static final String ABB_COLON = "[";
 
   private UrlUtils() {}
@@ -37,10 +37,11 @@ public class UrlUtils {
    */
   public static TEndPoint parseTEndPointIpv4AndIpv6Url(String endPointUrl) {
     TEndPoint endPoint = new TEndPoint();
-    if (endPointUrl.contains(POINT_COLON)) {
-      int point_position = endPointUrl.lastIndexOf(POINT_COLON);
-      String port = endPointUrl.substring(endPointUrl.lastIndexOf(POINT_COLON) 
+ 1);
+    if (endPointUrl.contains(PORT_SEPARATOR)) {
+      int point_position = endPointUrl.lastIndexOf(PORT_SEPARATOR);
+      String port = 
endPointUrl.substring(endPointUrl.lastIndexOf(PORT_SEPARATOR) + 1);
       String ip = endPointUrl.substring(0, point_position);
+      // If the ip/host part is provided as IPv6 address, cut off the 
surrounding square brackets.
       if (ip.contains(ABB_COLON)) {
         ip = ip.substring(1, ip.length() - 1);
       }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceHandler.java
index f9892f2ae62..abe4c03f861 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceHandler.java
@@ -27,7 +27,7 @@ import org.apache.thrift.transport.TTransport;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class ConfigNodeRPCServiceHandler implements TServerEventHandler {
-  private AtomicLong thriftConnectionNumber = new AtomicLong(0);
+  private final AtomicLong thriftConnectionNumber = new AtomicLong(0);
 
   public ConfigNodeRPCServiceHandler() {
     MetricService.getInstance()
@@ -35,13 +35,13 @@ public class ConfigNodeRPCServiceHandler implements 
TServerEventHandler {
   }
 
   @Override
-  public ServerContext createContext(TProtocol arg0, TProtocol arg1) {
+  public ServerContext createContext(TProtocol input, TProtocol output) {
     thriftConnectionNumber.incrementAndGet();
     return null;
   }
 
   @Override
-  public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol 
arg2) {
+  public void deleteContext(ServerContext serverContext, TProtocol input, 
TProtocol output) {
     thriftConnectionNumber.decrementAndGet();
   }
 
@@ -51,7 +51,7 @@ public class ConfigNodeRPCServiceHandler implements 
TServerEventHandler {
   }
 
   @Override
-  public void processContext(ServerContext arg0, TTransport arg1, TTransport 
arg2) {
+  public void processContext(ServerContext serverContext, TTransport input, 
TTransport output) {
     // nothing
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 5597610980f..fd7db9ce3ca 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -32,6 +32,7 @@ import 
org.apache.iotdb.common.rpc.thrift.TSetThrottleQuotaReq;
 import org.apache.iotdb.common.rpc.thrift.TShowConfigurationResp;
 import org.apache.iotdb.common.rpc.thrift.TShowTTLReq;
 import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
+import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.path.PartialPath;
@@ -200,14 +201,26 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ConfigNodeRPCServiceProcessor.class);
 
-  private static final ConfigNodeConfig CONFIG_NODE_CONFIG =
-      ConfigNodeDescriptor.getInstance().getConf();
-
-  protected ConfigManager configManager;
-
-  protected ConfigNodeRPCServiceProcessor() {}
+  protected final CommonConfig commonConfig;
+  protected final ConfigNodeConfig configNodeConfig;
+  protected final ConfigNode configNode;
+  protected final ConfigManager configManager;
 
   public ConfigNodeRPCServiceProcessor(ConfigManager configManager) {
+    this.commonConfig = CommonDescriptor.getInstance().getConfig();
+    this.configNodeConfig = ConfigNodeDescriptor.getInstance().getConf();
+    this.configNode = ConfigNode.getInstance();
+    this.configManager = configManager;
+  }
+
+  public ConfigNodeRPCServiceProcessor(
+      CommonConfig commonConfig,
+      ConfigNodeConfig configNodeConfig,
+      ConfigNode configNode,
+      ConfigManager configManager) {
+    this.commonConfig = commonConfig;
+    this.configNodeConfig = configNodeConfig;
+    this.configNode = configNode;
     this.configManager = configManager;
   }
 
@@ -320,7 +333,7 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
     if (isSystemDatabase) {
       databaseSchema.setSchemaReplicationFactor(1);
     } else if (!databaseSchema.isSetSchemaReplicationFactor()) {
-      
databaseSchema.setSchemaReplicationFactor(CONFIG_NODE_CONFIG.getSchemaReplicationFactor());
+      
databaseSchema.setSchemaReplicationFactor(configNodeConfig.getSchemaReplicationFactor());
     } else if (databaseSchema.getSchemaReplicationFactor() <= 0) {
       errorResp =
           new TSStatus(TSStatusCode.DATABASE_CONFIG_ERROR.getStatusCode())
@@ -331,7 +344,7 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
     if (isSystemDatabase) {
       databaseSchema.setDataReplicationFactor(1);
     } else if (!databaseSchema.isSetDataReplicationFactor()) {
-      
databaseSchema.setDataReplicationFactor(CONFIG_NODE_CONFIG.getDataReplicationFactor());
+      
databaseSchema.setDataReplicationFactor(configNodeConfig.getDataReplicationFactor());
     } else if (databaseSchema.getDataReplicationFactor() <= 0) {
       errorResp =
           new TSStatus(TSStatusCode.DATABASE_CONFIG_ERROR.getStatusCode())
@@ -340,8 +353,7 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
     }
 
     if (!databaseSchema.isSetTimePartitionOrigin()) {
-      databaseSchema.setTimePartitionOrigin(
-          CommonDescriptor.getInstance().getConfig().getTimePartitionOrigin());
+      
databaseSchema.setTimePartitionOrigin(commonConfig.getTimePartitionOrigin());
     } else if (databaseSchema.getTimePartitionOrigin() < 0) {
       errorResp =
           new TSStatus(TSStatusCode.DATABASE_CONFIG_ERROR.getStatusCode())
@@ -350,8 +362,7 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
     }
 
     if (!databaseSchema.isSetTimePartitionInterval()) {
-      databaseSchema.setTimePartitionInterval(
-          
CommonDescriptor.getInstance().getConfig().getTimePartitionInterval());
+      
databaseSchema.setTimePartitionInterval(commonConfig.getTimePartitionInterval());
     } else if (databaseSchema.getTimePartitionInterval() <= 0) {
       errorResp =
           new TSStatus(TSStatusCode.DATABASE_CONFIG_ERROR.getStatusCode())
@@ -363,7 +374,7 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
       databaseSchema.setMinSchemaRegionGroupNum(1);
     } else if (!databaseSchema.isSetMinSchemaRegionGroupNum()) {
       databaseSchema.setMinSchemaRegionGroupNum(
-          CONFIG_NODE_CONFIG.getDefaultSchemaRegionGroupNumPerDatabase());
+          configNodeConfig.getDefaultSchemaRegionGroupNumPerDatabase());
     } else if (databaseSchema.getMinSchemaRegionGroupNum() <= 0) {
       errorResp =
           new TSStatus(TSStatusCode.DATABASE_CONFIG_ERROR.getStatusCode())
@@ -375,7 +386,7 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
       databaseSchema.setMinDataRegionGroupNum(1);
     } else if (!databaseSchema.isSetMinDataRegionGroupNum()) {
       databaseSchema.setMinDataRegionGroupNum(
-          CONFIG_NODE_CONFIG.getDefaultDataRegionGroupNumPerDatabase());
+          configNodeConfig.getDefaultDataRegionGroupNumPerDatabase());
     } else if (databaseSchema.getMinDataRegionGroupNum() <= 0) {
       errorResp =
           new TSStatus(TSStatusCode.DATABASE_CONFIG_ERROR.getStatusCode())
@@ -690,7 +701,7 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
     LOGGER.info(
         "{} has successfully started and joined the cluster: {}.",
         ConfigNodeConstant.GLOBAL_NAME,
-        ConfigNodeDescriptor.getInstance().getConf().getClusterName());
+        configNodeConfig.getClusterName());
     return StatusUtils.OK;
   }
 
@@ -735,6 +746,11 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
   @Override
   public TSStatus stopConfigNode(TConfigNodeLocation configNodeLocation) {
     new Thread(
+            // TODO: Perhaps we should find some other way of shutting down 
the config node, adding
+            // a hard dependency
+            //  in order to do this feels a bit odd. Dispatching a shutdown 
event which is processed
+            // where the
+            //  instance is created feels cleaner.
             () -> {
               try {
                 // Sleep 1s before stop itself
@@ -743,7 +759,7 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
                 Thread.currentThread().interrupt();
                 LOGGER.warn(e.getMessage());
               } finally {
-                ConfigNode.getInstance().stop();
+                configNode.stop();
               }
             })
         .start();
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
new file mode 100644
index 00000000000..c4d993b5a79
--- /dev/null
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.service.thrift;
+
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import 
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRuntimeConfiguration;
+import org.apache.iotdb.confignode.service.ConfigNode;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.rpc.TimeoutChangeableTFastFramedTransport;
+
+import junit.framework.TestCase;
+import org.apache.thrift.transport.TSocket;
+import org.junit.Assert;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.net.InetAddress;
+import java.net.Socket;
+import java.util.Collections;
+
+public class ConfigNodeRPCServiceProcessorTest extends TestCase {
+
+  /**
+   * This test should be a normal data-node registration where a valid ip is 
used as address of the
+   * rpc-service. Nothing special should happen here.
+   *
+   * @throws Exception nothing should go wrong here.
+   */
+  public void testRegisterDataNode() throws Exception {
+    // Set up the system under test.
+    CommonConfig commonConfig = Mockito.mock(CommonConfig.class);
+    ConfigNodeConfig configNodeConfig = Mockito.mock(ConfigNodeConfig.class);
+    ConfigNode configNode = Mockito.mock(ConfigNode.class);
+    ConfigManager configManager = Mockito.mock(ConfigManager.class);
+    DataNodeRegisterResp registerDataNodeResponse = new DataNodeRegisterResp();
+    registerDataNodeResponse.setStatus(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+    registerDataNodeResponse.setConfigNodeList(
+        Collections.singletonList(new TConfigNodeLocation()));
+    registerDataNodeResponse.setDataNodeId(42);
+    registerDataNodeResponse.setRuntimeConfiguration(new 
TRuntimeConfiguration());
+    
Mockito.when(configManager.registerDataNode(Mockito.any(TDataNodeRegisterReq.class)))
+        .thenReturn(registerDataNodeResponse);
+    Socket socket = Mockito.mock(Socket.class);
+    Mockito.when(socket.getInetAddress())
+        .thenReturn(InetAddress.getByAddress(new byte[] {1, 2, 3, 4}));
+    TSocket tSocket = Mockito.mock(TSocket.class);
+    Mockito.when(tSocket.getSocket()).thenReturn(socket);
+    TimeoutChangeableTFastFramedTransport transport =
+        Mockito.mock(TimeoutChangeableTFastFramedTransport.class);
+    Mockito.when(transport.getSocket()).thenReturn(tSocket);
+    ConfigNodeRPCServiceProcessor sut =
+        new ConfigNodeRPCServiceProcessor(
+            commonConfig, configNodeConfig, configNode, configManager);
+
+    // Prepare the test input
+    TDataNodeLocation newDataNodeLocation = new TDataNodeLocation();
+    newDataNodeLocation.setDataNodeId(42);
+    newDataNodeLocation.setClientRpcEndPoint(new TEndPoint("1.2.3.4", 6667));
+    TDataNodeConfiguration newDataNodeConfiguration = new 
TDataNodeConfiguration();
+    newDataNodeConfiguration.setLocation(newDataNodeLocation);
+    TDataNodeRegisterReq req = new TDataNodeRegisterReq();
+    req.setClusterName("test-cluster");
+    req.setDataNodeConfiguration(newDataNodeConfiguration);
+
+    // Execute the test logic
+    TDataNodeRegisterResp res = sut.registerDataNode(req);
+
+    // Check the result
+    
Assert.assertEquals(registerDataNodeResponse.convertToRpcDataNodeRegisterResp(),
 res);
+    // Check that the config manager was called to register a new node
+    ArgumentCaptor<TDataNodeRegisterReq> acRequest =
+        ArgumentCaptor.forClass(TDataNodeRegisterReq.class);
+    Mockito.verify(configManager, 
Mockito.times(1)).registerDataNode(acRequest.capture());
+    TDataNodeRegisterReq sentRequest = acRequest.getValue();
+    Assert.assertEquals(
+        "1.2.3.4",
+        
sentRequest.getDataNodeConfiguration().getLocation().getClientRpcEndPoint().getIp());
+  }
+
+  /**
+   * This test should be a normal data-node restart where a valid ip is used 
as address of the
+   * rpc-service. Nothing special should happen here.
+   *
+   * @throws Exception nothing should go wrong here.
+   */
+  public void testRestartDataNode() throws Exception {
+    // Set up the system under test.
+    CommonConfig commonConfig = Mockito.mock(CommonConfig.class);
+    ConfigNodeConfig configNodeConfig = Mockito.mock(ConfigNodeConfig.class);
+    ConfigNode configNode = Mockito.mock(ConfigNode.class);
+    ConfigManager configManager = Mockito.mock(ConfigManager.class);
+    TDataNodeRestartResp restartDataNodeResponse = new TDataNodeRestartResp();
+    restartDataNodeResponse.setStatus(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+    restartDataNodeResponse.setConfigNodeList(Collections.singletonList(new 
TConfigNodeLocation()));
+    restartDataNodeResponse.setRuntimeConfiguration(new 
TRuntimeConfiguration());
+    
Mockito.when(configManager.restartDataNode(Mockito.any(TDataNodeRestartReq.class)))
+        .thenReturn(restartDataNodeResponse);
+    Socket socket = Mockito.mock(Socket.class);
+    Mockito.when(socket.getInetAddress())
+        .thenReturn(InetAddress.getByAddress(new byte[] {1, 2, 3, 4}));
+    TSocket tSocket = Mockito.mock(TSocket.class);
+    Mockito.when(tSocket.getSocket()).thenReturn(socket);
+    TimeoutChangeableTFastFramedTransport transport =
+        Mockito.mock(TimeoutChangeableTFastFramedTransport.class);
+    Mockito.when(transport.getSocket()).thenReturn(tSocket);
+    ConfigNodeRPCServiceProcessor sut =
+        new ConfigNodeRPCServiceProcessor(
+            commonConfig, configNodeConfig, configNode, configManager);
+
+    // Prepare the test input
+    TDataNodeLocation newDataNodeLocation = new TDataNodeLocation();
+    newDataNodeLocation.setDataNodeId(42);
+    newDataNodeLocation.setClientRpcEndPoint(new TEndPoint("1.2.3.4", 6667));
+    TDataNodeConfiguration newDataNodeConfiguration = new 
TDataNodeConfiguration();
+    newDataNodeConfiguration.setLocation(newDataNodeLocation);
+    TDataNodeRestartReq req = new TDataNodeRestartReq();
+    req.setClusterName("test-cluster");
+    req.setDataNodeConfiguration(newDataNodeConfiguration);
+
+    // Execute the test logic
+    TDataNodeRestartResp res = sut.restartDataNode(req);
+
+    // Check the result
+    Assert.assertEquals(restartDataNodeResponse, res);
+    // Check that the config manager was called to register a new node
+    ArgumentCaptor<TDataNodeRestartReq> acRequest =
+        ArgumentCaptor.forClass(TDataNodeRestartReq.class);
+    Mockito.verify(configManager, 
Mockito.times(1)).restartDataNode(acRequest.capture());
+    TDataNodeRestartReq sentRequest = acRequest.getValue();
+    // In this case we expect the ConfigNodeRPCServiceProcessor to have 
replaced the
+    // ip of "0.0.0.0" with the IP it got the request from.
+    Assert.assertEquals(
+        "1.2.3.4",
+        
sentRequest.getDataNodeConfiguration().getLocation().getClientRpcEndPoint().getIp());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java
index d6760111032..92f55888d83 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java
@@ -20,12 +20,12 @@ package org.apache.iotdb.db.service;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.ServerCommandLine;
+import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.consensus.ConfigRegionId;
 import org.apache.iotdb.commons.exception.BadNodeUrlException;
 import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
@@ -33,6 +33,7 @@ import 
org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,8 +42,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
-import static org.apache.commons.lang3.StringUtils.isNumeric;
-
 public class DataNodeServerCommandLine extends ServerCommandLine {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(DataNodeServerCommandLine.class);
@@ -53,12 +52,39 @@ public class DataNodeServerCommandLine extends 
ServerCommandLine {
   // metaport-of-removed-node
   public static final String MODE_REMOVE = "-r";
 
+  private final ConfigNodeInfo configNodeInfo;
+  private final IClientManager<ConfigRegionId, ConfigNodeClient> 
configNodeClientManager;
+  private final DataNode dataNode;
+
   private static final String USAGE =
       "Usage: <-s|-r> "
           + "[-D{} <configure folder>] \n"
           + "-s: start the node to the cluster\n"
           + "-r: remove the node out of the cluster\n";
 
+  /** Default constructor using the singletons for initializing the 
relationship. */
+  public DataNodeServerCommandLine() {
+    configNodeInfo = ConfigNodeInfo.getInstance();
+    configNodeClientManager = ConfigNodeClientManager.getInstance();
+    dataNode = DataNode.getInstance();
+  }
+
+  /**
+   * Additional constructor allowing injection of custom instances (mainly for 
testing)
+   *
+   * @param configNodeInfo config node info
+   * @param configNodeClientManager config node client manager
+   * @param dataNode data node
+   */
+  public DataNodeServerCommandLine(
+      ConfigNodeInfo configNodeInfo,
+      IClientManager<ConfigRegionId, ConfigNodeClient> configNodeClientManager,
+      DataNode dataNode) {
+    this.configNodeInfo = configNodeInfo;
+    this.configNodeClientManager = configNodeClientManager;
+    this.dataNode = dataNode;
+  }
+
   @Override
   protected String getUsage() {
     return USAGE;
@@ -71,8 +97,6 @@ public class DataNodeServerCommandLine extends 
ServerCommandLine {
       return -1;
     }
 
-    DataNode dataNode = DataNode.getInstance();
-
     String mode = args[0];
     LOGGER.info("Running mode {}", mode);
 
@@ -80,7 +104,7 @@ public class DataNodeServerCommandLine extends 
ServerCommandLine {
     if (MODE_START.equals(mode)) {
       dataNode.doAddNode();
     } else if (MODE_REMOVE.equals(mode)) {
-      doRemoveDataNode(args);
+      return doRemoveDataNode(args);
     } else {
       LOGGER.error("Unrecognized mode {}", mode);
     }
@@ -88,22 +112,25 @@ public class DataNodeServerCommandLine extends 
ServerCommandLine {
   }
 
   /**
-   * remove datanodes from cluster
+   * remove data-nodes from cluster
    *
    * @param args id or ip:rpc_port for removed datanode
    */
-  private void doRemoveDataNode(String[] args)
+  private int doRemoveDataNode(String[] args)
       throws BadNodeUrlException, TException, IoTDBException, 
ClientManagerException {
 
     if (args.length != 2) {
       LOGGER.info("Usage: <node-id>/<ip>:<rpc-port>");
-      return;
+      return -1;
     }
 
+    // REMARK: Don't need null or empty-checks for args[0] or args[1], as if 
they were
+    // empty, the JVM would have not received them.
+
     LOGGER.info("Starting to remove DataNode from cluster, parameter: {}, {}", 
args[0], args[1]);
 
     // Load ConfigNodeList from system.properties file
-    ConfigNodeInfo.getInstance().loadConfigNodeList();
+    configNodeInfo.loadConfigNodeList();
 
     List<TDataNodeLocation> dataNodeLocations = 
buildDataNodeLocations(args[1]);
     if (dataNodeLocations.isEmpty()) {
@@ -112,7 +139,7 @@ public class DataNodeServerCommandLine extends 
ServerCommandLine {
     LOGGER.info("Start to remove datanode, removed datanode endpoints: {}", 
dataNodeLocations);
     TDataNodeRemoveReq removeReq = new TDataNodeRemoveReq(dataNodeLocations);
     try (ConfigNodeClient configNodeClient =
-        
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
 {
+        configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) 
{
       TDataNodeRemoveResp removeResp = 
configNodeClient.removeDataNode(removeReq);
       LOGGER.info("Remove result {} ", removeResp);
       if (removeResp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -125,6 +152,7 @@ public class DataNodeServerCommandLine extends 
ServerCommandLine {
               + "and after the process of removing datanode ends successfully, 
"
               + "you are supposed to delete directory and data of the 
removed-datanode manually");
     }
+    return 0;
   }
 
   /**
@@ -135,57 +163,62 @@ public class DataNodeServerCommandLine extends 
ServerCommandLine {
    */
   private List<TDataNodeLocation> buildDataNodeLocations(String args) {
     List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
-    if (args == null || args.trim().isEmpty()) {
-      return dataNodeLocations;
-    }
 
     // Now support only single datanode deletion
     if (args.split(",").length > 1) {
-      LOGGER.info("Incorrect input format, usage: <id>/<ip>:<rpc-port>");
-      return dataNodeLocations;
+      throw new IllegalArgumentException("Currently only removing single nodes 
is supported.");
     }
 
     // Below supports multiple datanode deletion, split by ',', and is 
reserved for extension
-    try {
-      List<TEndPoint> endPoints = NodeUrlUtils.parseTEndPointUrls(args);
-      try (ConfigNodeClient client =
-          
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
 {
-        dataNodeLocations =
-            
client.getDataNodeConfiguration(-1).getDataNodeConfigurationMap().values().stream()
-                .map(TDataNodeConfiguration::getLocation)
-                .filter(location -> 
endPoints.contains(location.getClientRpcEndPoint()))
-                .collect(Collectors.toList());
-      } catch (TException | ClientManagerException e) {
-        LOGGER.error("Get data node locations failed", e);
-      }
-    } catch (BadNodeUrlException e) {
-      try (ConfigNodeClient client =
-          
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
 {
-        for (String id : args.split(",")) {
-          if (!isNumeric(id)) {
-            LOGGER.warn("Incorrect id format {}, skipped...", id);
-            continue;
-          }
-          List<TDataNodeLocation> nodeLocationResult =
-              client
-                  .getDataNodeConfiguration(Integer.parseInt(id))
-                  .getDataNodeConfigurationMap()
-                  .values()
-                  .stream()
-                  .map(TDataNodeConfiguration::getLocation)
-                  .collect(Collectors.toList());
-          if (nodeLocationResult.isEmpty()) {
-            LOGGER.warn("DataNode {} is not in cluster, skipped...", id);
-            continue;
-          }
-          if (!dataNodeLocations.contains(nodeLocationResult.get(0))) {
-            dataNodeLocations.add(nodeLocationResult.get(0));
-          }
-        }
-      } catch (TException | ClientManagerException e1) {
-        LOGGER.error("Get data node locations failed", e);
-      }
+    List<NodeCoordinate> nodeCoordinates = parseCoordinates(args);
+    try (ConfigNodeClient client =
+        configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) 
{
+      dataNodeLocations =
+          
client.getDataNodeConfiguration(-1).getDataNodeConfigurationMap().values().stream()
+              .map(TDataNodeConfiguration::getLocation)
+              .filter(
+                  location ->
+                      nodeCoordinates.stream()
+                          .anyMatch(nodeCoordinate -> 
nodeCoordinate.matches(location)))
+              .collect(Collectors.toList());
+    } catch (TException | ClientManagerException e) {
+      LOGGER.error("Get data node locations failed", e);
     }
+
     return dataNodeLocations;
   }
+
+  protected List<NodeCoordinate> parseCoordinates(String coordinatesString) {
+    // Multiple nodeIds are separated by ","
+    String[] nodeIdStrings = coordinatesString.split(",");
+    List<NodeCoordinate> nodeIdCoordinates = new 
ArrayList<>(nodeIdStrings.length);
+    for (String nodeId : nodeIdStrings) {
+      // In the other case, we expect it to be a numeric value referring to 
the node-id
+      if (NumberUtils.isCreatable(nodeId)) {
+        nodeIdCoordinates.add(new 
NodeCoordinateNodeId(Integer.parseInt(nodeId)));
+      } else {
+        LOGGER.error("Invalid format. Expected a numeric node id, but got: 
{}", nodeId);
+      }
+    }
+    return nodeIdCoordinates;
+  }
+
+  protected interface NodeCoordinate {
+    // Returns true if the given location matches this coordinate
+    boolean matches(TDataNodeLocation location);
+  }
+
+  /** Implementation of a NodeCoordinate that uses the node id to match. */
+  protected static class NodeCoordinateNodeId implements NodeCoordinate {
+    private final int nodeId;
+
+    public NodeCoordinateNodeId(int nodeId) {
+      this.nodeId = nodeId;
+    }
+
+    @Override
+    public boolean matches(TDataNodeLocation location) {
+      return location.isSetDataNodeId() && location.dataNodeId == nodeId;
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeServerCommandLineTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeServerCommandLineTest.java
new file mode 100644
index 00000000000..ab37e6bc64a
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeServerCommandLineTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.service;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TNodeResource;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.consensus.ConfigRegionId;
+import org.apache.iotdb.commons.exception.BadNodeUrlException;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeConfigurationResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import junit.framework.TestCase;
+import org.junit.Assert;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+public class DataNodeServerCommandLineTest extends TestCase {
+
+  // List of well known locations for this test
+  protected static final TDataNodeLocation LOCATION_1 =
+      new TDataNodeLocation(1, new TEndPoint("1.2.3.4", 6667), null, null, 
null, null);
+  protected static final TDataNodeLocation LOCATION_2 =
+      new TDataNodeLocation(2, new TEndPoint("1.2.3.5", 6667), null, null, 
null, null);
+  protected static final TDataNodeLocation LOCATION_3 =
+      new TDataNodeLocation(3, new TEndPoint("1.2.3.6", 6667), null, null, 
null, null);
+  // An invalid location
+  protected static final TDataNodeLocation INVALID_LOCATION =
+      new TDataNodeLocation(23, new TEndPoint("4.3.2.1", 815), null, null, 
null, null);
+
+  /**
+   * In this test we pass an empty args list to the command. This is expected 
to fail.
+   *
+   * @throws Exception nothing should go wrong here.
+   */
+  public void testNoArgs() throws Exception {
+    // No need to initialize these mocks with anything sensible, as they 
should never be used.
+    ConfigNodeInfo configNodeInfo = null;
+    IClientManager<ConfigRegionId, ConfigNodeClient> configNodeClientManager = 
null;
+    DataNode dataNode = null;
+    DataNodeServerCommandLine sut =
+        new DataNodeServerCommandLine(configNodeInfo, configNodeClientManager, 
dataNode);
+
+    int returnCode = sut.run(new String[0]);
+
+    // We expect an error code of -1.
+    Assert.assertEquals(-1, returnCode);
+  }
+
+  /**
+   * In this test we pass too many arguments to the command. This should also 
fail with an error
+   * code.
+   *
+   * @throws Exception nothing should go wrong here.
+   */
+  public void testTooManyArgs() throws Exception {
+    // No need to initialize these mocks with anything sensible, as they 
should never be used.
+    ConfigNodeInfo configNodeInfo = null;
+    IClientManager<ConfigRegionId, ConfigNodeClient> configNodeClientManager = 
null;
+    DataNode dataNode = null;
+    DataNodeServerCommandLine sut =
+        new DataNodeServerCommandLine(configNodeInfo, configNodeClientManager, 
dataNode);
+
+    int returnCode = sut.run(new String[] {"-r", "2", "-s"});
+
+    // We expect an error code of -1.
+    Assert.assertEquals(-1, returnCode);
+  }
+
+  /**
+   * In this test case we provide the coordinates for the data-node that we 
want to delete by
+   * providing the node-id of that node.
+   *
+   * @throws Exception nothing should go wrong here.
+   */
+  public void testSingleDataNodeRemoveById() throws Exception {
+    ConfigNodeInfo configNodeInfo = Mockito.mock(ConfigNodeInfo.class);
+    IClientManager<ConfigRegionId, ConfigNodeClient> configNodeClientManager =
+        Mockito.mock(IClientManager.class);
+    ConfigNodeClient client = Mockito.mock(ConfigNodeClient.class);
+    
Mockito.when(configNodeClientManager.borrowClient(Mockito.any(ConfigRegionId.class)))
+        .thenReturn(client);
+    // This is the result of the getDataNodeConfiguration, which contains the 
list of known data
+    // nodes.
+    TDataNodeConfigurationResp tDataNodeConfigurationResp = new 
TDataNodeConfigurationResp();
+    tDataNodeConfigurationResp.putToDataNodeConfigurationMap(
+        1, new TDataNodeConfiguration(LOCATION_1, new TNodeResource()));
+    tDataNodeConfigurationResp.putToDataNodeConfigurationMap(
+        2, new TDataNodeConfiguration(LOCATION_2, new TNodeResource()));
+    tDataNodeConfigurationResp.putToDataNodeConfigurationMap(
+        3, new TDataNodeConfiguration(LOCATION_3, new TNodeResource()));
+    Mockito.when(client.getDataNodeConfiguration(Mockito.anyInt()))
+        .thenReturn(tDataNodeConfigurationResp);
+    // Only return something sensible, if exactly this location is asked to be 
deleted.
+    Mockito.when(
+            client.removeDataNode(new 
TDataNodeRemoveReq(Collections.singletonList(LOCATION_2))))
+        .thenReturn(
+            new TDataNodeRemoveResp(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+    DataNode dataNode = Mockito.mock(DataNode.class);
+    DataNodeServerCommandLine sut =
+        new DataNodeServerCommandLine(configNodeInfo, configNodeClientManager, 
dataNode);
+
+    int returnCode = sut.run(new String[] {"-r", "2"});
+
+    // Check the overall return code was ok.
+    Assert.assertEquals(0, returnCode);
+    // Check that the config node client was actually called with a request to 
remove the
+    // node we want it to remove
+    Mockito.verify(client, Mockito.times(1))
+        .removeDataNode(new 
TDataNodeRemoveReq(Collections.singletonList(LOCATION_2)));
+  }
+
+  /**
+   * In this test case we provide the coordinates for the data-node that we 
want to delete by
+   * providing the node-id of that node. However, the coordinates are invalid 
and therefore the
+   * deletion fails with an error.
+   *
+   * @throws Exception nothing should go wrong here.
+   */
+  public void testSingleDataNodeRemoveByIdWithInvalidCoordinates() throws 
Exception {
+    ConfigNodeInfo configNodeInfo = Mockito.mock(ConfigNodeInfo.class);
+    IClientManager<ConfigRegionId, ConfigNodeClient> configNodeClientManager =
+        Mockito.mock(IClientManager.class);
+    ConfigNodeClient client = Mockito.mock(ConfigNodeClient.class);
+    
Mockito.when(configNodeClientManager.borrowClient(Mockito.any(ConfigRegionId.class)))
+        .thenReturn(client);
+    // This is the result of the getDataNodeConfiguration, which contains the 
list of known data
+    // nodes.
+    TDataNodeConfigurationResp tDataNodeConfigurationResp = new 
TDataNodeConfigurationResp();
+    tDataNodeConfigurationResp.putToDataNodeConfigurationMap(
+        1, new TDataNodeConfiguration(LOCATION_1, new TNodeResource()));
+    tDataNodeConfigurationResp.putToDataNodeConfigurationMap(
+        2, new TDataNodeConfiguration(LOCATION_2, new TNodeResource()));
+    tDataNodeConfigurationResp.putToDataNodeConfigurationMap(
+        3, new TDataNodeConfiguration(LOCATION_3, new TNodeResource()));
+    Mockito.when(client.getDataNodeConfiguration(Mockito.anyInt()))
+        .thenReturn(tDataNodeConfigurationResp);
+    DataNode dataNode = Mockito.mock(DataNode.class);
+    DataNodeServerCommandLine sut =
+        new DataNodeServerCommandLine(configNodeInfo, configNodeClientManager, 
dataNode);
+
+    try {
+      sut.run(new String[] {"-r", "23"});
+      Assert.fail("This call should have failed");
+    } catch (Exception e) {
+      // This is actually what we expected
+      Assert.assertTrue(e instanceof BadNodeUrlException);
+    }
+  }
+
+  /**
+   * In this test case we provide the coordinates for the data-node that we 
want to delete by
+   * providing the node-id of that node. NOTE: The test was prepared to test 
deletion of multiple
+   * nodes, however currently we don't support this.
+   *
+   * @throws Exception nothing should go wrong here.
+   */
+  public void testMultipleDataNodeRemoveById() throws Exception {
+    ConfigNodeInfo configNodeInfo = Mockito.mock(ConfigNodeInfo.class);
+    IClientManager<ConfigRegionId, ConfigNodeClient> configNodeClientManager =
+        Mockito.mock(IClientManager.class);
+    ConfigNodeClient client = Mockito.mock(ConfigNodeClient.class);
+    
Mockito.when(configNodeClientManager.borrowClient(Mockito.any(ConfigRegionId.class)))
+        .thenReturn(client);
+    // This is the result of the getDataNodeConfiguration, which contains the 
list of known data
+    // nodes.
+    TDataNodeConfigurationResp tDataNodeConfigurationResp = new 
TDataNodeConfigurationResp();
+    tDataNodeConfigurationResp.putToDataNodeConfigurationMap(
+        1, new TDataNodeConfiguration(LOCATION_1, new TNodeResource()));
+    tDataNodeConfigurationResp.putToDataNodeConfigurationMap(
+        2, new TDataNodeConfiguration(LOCATION_2, new TNodeResource()));
+    tDataNodeConfigurationResp.putToDataNodeConfigurationMap(
+        3, new TDataNodeConfiguration(LOCATION_3, new TNodeResource()));
+    Mockito.when(client.getDataNodeConfiguration(Mockito.anyInt()))
+        .thenReturn(tDataNodeConfigurationResp);
+    // Only return something sensible, if exactly the locations we want are 
asked to be deleted.
+    Mockito.when(
+            client.removeDataNode(new 
TDataNodeRemoveReq(Arrays.asList(LOCATION_1, LOCATION_2))))
+        .thenReturn(
+            new TDataNodeRemoveResp(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+    DataNode dataNode = Mockito.mock(DataNode.class);
+    DataNodeServerCommandLine sut =
+        new DataNodeServerCommandLine(configNodeInfo, configNodeClientManager, 
dataNode);
+
+    try {
+      sut.run(new String[] {"-r", "1,2"});
+      Assert.fail("This call should have failed");
+    } catch (Exception e) {
+      // This is actually what we expected
+      Assert.assertTrue(e instanceof IllegalArgumentException);
+    }
+  }
+}


Reply via email to