This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f55dfe07e5e Add more tests for removing datanode (#15749)
f55dfe07e5e is described below
commit f55dfe07e5eaecb9a3935c177579e6d018accd74
Author: Yongzao <[email protected]>
AuthorDate: Wed Jun 18 09:05:37 2025 +0800
Add more tests for removing datanode (#15749)
---
.../IoTDBRemoveConfigNodeITFramework.java | 2 +-
.../IoTDBRemoveDataNodeNormalIT.java | 362 ++++++++++++++++++++-
.../removedatanode/IoTDBRemoveDataNodeUtils.java | 180 ++++++++++
...work.java => IoTDBRemoveUnknownDataNodeIT.java} | 297 +++++++----------
.../confignode/it/utils/ConfigNodeTestUtils.java | 12 +
.../consensus/pipe/PipeConsensusServerImpl.java | 6 +-
6 files changed, 666 insertions(+), 193 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/removeconfignode/IoTDBRemoveConfigNodeITFramework.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removeconfignode/IoTDBRemoveConfigNodeITFramework.java
index 8ba221a3233..6112fea7f52 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/removeconfignode/IoTDBRemoveConfigNodeITFramework.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removeconfignode/IoTDBRemoveConfigNodeITFramework.java
@@ -46,7 +46,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getDataRegionMap;
-import static
org.apache.iotdb.confignode.it.removedatanode.IoTDBRemoveDataNodeITFramework.getConnectionWithSQLType;
+import static
org.apache.iotdb.confignode.it.removedatanode.IoTDBRemoveDataNodeUtils.getConnectionWithSQLType;
import static org.apache.iotdb.util.MagicUtils.makeItCloseQuietly;
public class IoTDBRemoveConfigNodeITFramework {
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeNormalIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeNormalIT.java
index 6dbf5a155ac..0435629a911 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeNormalIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeNormalIT.java
@@ -19,44 +19,376 @@
package org.apache.iotdb.confignode.it.removedatanode;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
+import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.exception.InconsistentDataException;
+import org.apache.iotdb.jdbc.IoTDBSQLException;
+import org.apache.iotdb.relational.it.query.old.aligned.TableUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.awaitility.core.ConditionTimeoutException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import static
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getDataRegionMap;
+import static
org.apache.iotdb.confignode.it.removedatanode.IoTDBRemoveDataNodeUtils.awaitUntilSuccess;
+import static
org.apache.iotdb.confignode.it.removedatanode.IoTDBRemoveDataNodeUtils.generateRemoveString;
+import static
org.apache.iotdb.confignode.it.removedatanode.IoTDBRemoveDataNodeUtils.getConnectionWithSQLType;
+import static
org.apache.iotdb.confignode.it.removedatanode.IoTDBRemoveDataNodeUtils.restartDataNodes;
+import static
org.apache.iotdb.confignode.it.removedatanode.IoTDBRemoveDataNodeUtils.selectRemoveDataNodes;
+import static org.apache.iotdb.util.MagicUtils.makeItCloseQuietly;
@Category({ClusterIT.class})
@RunWith(IoTDBTestRunner.class)
-public class IoTDBRemoveDataNodeNormalIT extends
IoTDBRemoveDataNodeITFramework {
+public class IoTDBRemoveDataNodeNormalIT {
- @Test
- public void success1C4DTest() throws Exception {
- successTest(2, 3, 1, 4, 1, 2, true, SQLModel.NOT_USE_SQL);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBRemoveDataNodeNormalIT.class);
+
+ private static final String SHOW_DATANODES = "show datanodes";
+
+ private static final String DEFAULT_SCHEMA_REGION_GROUP_EXTENSION_POLICY =
"CUSTOM";
+ private static final String DEFAULT_DATA_REGION_GROUP_EXTENSION_POLICY =
"CUSTOM";
+
+ @Before
+ public void setUp() throws Exception {
+ // Setup common environmental configuration
+ EnvFactory.getEnv()
+ .getConfig()
+ .getCommonConfig()
+ .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+
.setSchemaRegionGroupExtensionPolicy(DEFAULT_SCHEMA_REGION_GROUP_EXTENSION_POLICY)
+
.setDataRegionGroupExtensionPolicy(DEFAULT_DATA_REGION_GROUP_EXTENSION_POLICY);
}
- @Test
- public void fail1C3DTest() throws Exception {
- failTest(2, 3, 1, 3, 1, 2, false, SQLModel.NOT_USE_SQL);
+ @After
+ public void tearDown() throws InterruptedException {
+ EnvFactory.getEnv().cleanClusterEnvironment();
}
+ // @Test
+ // public void success1C4DIoTTest() throws Exception {
+ // // Setup 1C4D, and remove 1D, this test should success
+ // successTest(2, 3, 1, 4, 1, 2, true, SQLModel.NOT_USE_SQL,
ConsensusFactory.IOT_CONSENSUS);
+ // }
+ //
+ // @Test
+ // public void success1C4DIoTV2Test() throws Exception {
+ // // Setup 1C4D, and remove 1D, this test should success
+ // successTest(2, 3, 1, 4, 1, 2, true, SQLModel.NOT_USE_SQL,
+ // ConsensusFactory.IOT_CONSENSUS_V2);
+ // }
+
+ // @Test
+ // public void fail1C3DIoTTest() throws Exception {
+ // // Setup 1C3D with schema replication factor = 3, and remove 1D, this
test should fail due
+ // to
+ // // insufficient DN for holding schema
+ // failTest(2, 3, 1, 3, 1, 2, false, SQLModel.NOT_USE_SQL,
ConsensusFactory.IOT_CONSENSUS);
+ // }
+ //
+ // @Test
+ // public void fail1C3DIoTV2Test() throws Exception {
+ // // Setup 1C3D with schema replication factor = 3, and remove 1D, this
test should fail due
+ // to
+ // // insufficient DN for holding schema
+ // failTest(2, 3, 1, 3, 1, 2, false, SQLModel.NOT_USE_SQL,
ConsensusFactory.IOT_CONSENSUS_V2);
+ // }
+
+ // @Test
+ // public void success1C4DIoTTestUseSQL() throws Exception {
+ // // Setup 1C4D, and remove 1D, this test should success
+ // successTest(2, 3, 1, 4, 1, 2, true, SQLModel.TREE_MODEL_SQL,
+ // ConsensusFactory.IOT_CONSENSUS);
+ // }
+ //
+ // @Test
+ // public void success1C4DIoTV2TestUseSQL() throws Exception {
+ // // Setup 1C4D, and remove 1D, this test should success
+ // successTest(2, 3, 1, 4, 1, 2, true, SQLModel.TREE_MODEL_SQL,
+ // ConsensusFactory.IOT_CONSENSUS_V2);
+ // }
+
@Test
- public void success1C4DTestUseSQL() throws Exception {
- successTest(2, 3, 1, 4, 1, 2, true, SQLModel.TREE_MODEL_SQL);
+ public void fail1C3DTestIoTUseSQL() throws Exception {
+ // Setup 1C3D with schema replication factor = 3, and remove 1D, this test
should fail due to
+ // insufficient DN for holding schema
+ failTest(2, 3, 1, 3, 1, 2, false, SQLModel.TREE_MODEL_SQL,
ConsensusFactory.IOT_CONSENSUS);
}
@Test
- public void fail1C3DTestUseSQL() throws Exception {
- failTest(2, 3, 1, 3, 1, 2, false, SQLModel.TREE_MODEL_SQL);
+ public void fail1C3DTestIoTV2UseSQL() throws Exception {
+ // Setup 1C3D with schema replication factor = 3, and remove 1D, this test
should fail due to
+ // insufficient DN for holding schema
+ failTest(2, 3, 1, 3, 1, 2, false, SQLModel.TREE_MODEL_SQL,
ConsensusFactory.IOT_CONSENSUS_V2);
}
@Test
- public void success1C4DTestUseTableSQL() throws Exception {
- successTest(2, 3, 1, 4, 1, 2, true, SQLModel.TABLE_MODEL_SQL);
+ public void success1C4DIoTTestUseTableSQL() throws Exception {
+ // Setup 1C4D, and remove 1D, this test should success
+ successTest(2, 3, 1, 4, 1, 2, true, SQLModel.TABLE_MODEL_SQL,
ConsensusFactory.IOT_CONSENSUS);
}
@Test
- public void fail1C3DTestUseTableSQL() throws Exception {
- failTest(2, 3, 1, 3, 1, 2, false, SQLModel.TABLE_MODEL_SQL);
+ public void success1C4DIoTV2TestUseTableSQL() throws Exception {
+ // Setup 1C4D, and remove 1D, this test should success
+ successTest(
+ 2, 3, 1, 4, 1, 2, true, SQLModel.TABLE_MODEL_SQL,
ConsensusFactory.IOT_CONSENSUS_V2);
+ }
+
+ // @Test
+ // public void fail1C3DIoTTestUseTableSQL() throws Exception {
+ // // Setup 1C3D with schema replication factor = 3, and remove 1D, this
test should fail due
+ // to
+ // // insufficient DN for holding schema
+ // failTest(2, 3, 1, 3, 1, 2, false, SQLModel.TABLE_MODEL_SQL,
ConsensusFactory.IOT_CONSENSUS);
+ // }
+ //
+ // @Test
+ // public void fail1C3DIoTV2TestUseTableSQL() throws Exception {
+ // // Setup 1C3D with schema replication factor = 3, and remove 1D, this
test should fail due
+ // to
+ // // insufficient DN for holding schema
+ // failTest(2, 3, 1, 3, 1, 2, false, SQLModel.TABLE_MODEL_SQL,
+ // ConsensusFactory.IOT_CONSENSUS_V2);
+ // }
+
+ private void successTest(
+ final int dataReplicateFactor,
+ final int schemaReplicationFactor,
+ final int configNodeNum,
+ final int dataNodeNum,
+ final int removeDataNodeNum,
+ final int dataRegionPerDataNode,
+ final boolean rejoinRemovedDataNode,
+ final SQLModel model,
+ final String dataRegionGroupConsensusProtocol)
+ throws Exception {
+ testRemoveDataNode(
+ dataReplicateFactor,
+ schemaReplicationFactor,
+ configNodeNum,
+ dataNodeNum,
+ removeDataNodeNum,
+ dataRegionPerDataNode,
+ true,
+ rejoinRemovedDataNode,
+ model,
+ dataRegionGroupConsensusProtocol);
+ }
+
+ private void failTest(
+ final int dataReplicateFactor,
+ final int schemaReplicationFactor,
+ final int configNodeNum,
+ final int dataNodeNum,
+ final int removeDataNodeNum,
+ final int dataRegionPerDataNode,
+ final boolean rejoinRemovedDataNode,
+ final SQLModel model,
+ final String dataRegionGroupConsensusProtocol)
+ throws Exception {
+ testRemoveDataNode(
+ dataReplicateFactor,
+ schemaReplicationFactor,
+ configNodeNum,
+ dataNodeNum,
+ removeDataNodeNum,
+ dataRegionPerDataNode,
+ false,
+ rejoinRemovedDataNode,
+ model,
+ dataRegionGroupConsensusProtocol);
+ }
+
+ public void testRemoveDataNode(
+ final int dataReplicateFactor,
+ final int schemaReplicationFactor,
+ final int configNodeNum,
+ final int dataNodeNum,
+ final int removeDataNodeNum,
+ final int dataRegionPerDataNode,
+ final boolean expectRemoveSuccess,
+ final boolean rejoinRemovedDataNode,
+ final SQLModel model,
+ final String dataRegionGroupConsensusProtocol)
+ throws Exception {
+ // Set up specific environment
+ EnvFactory.getEnv()
+ .getConfig()
+ .getCommonConfig()
+ .setDataRegionConsensusProtocolClass(dataRegionGroupConsensusProtocol)
+ .setSchemaReplicationFactor(schemaReplicationFactor)
+ .setDataReplicationFactor(dataReplicateFactor)
+ .setDefaultDataRegionGroupNumPerDatabase(
+ dataRegionPerDataNode * dataNodeNum / dataReplicateFactor);
+ EnvFactory.getEnv().initClusterEnvironment(configNodeNum, dataNodeNum);
+
+ try (final Connection connection =
makeItCloseQuietly(getConnectionWithSQLType(model));
+ final Statement statement =
makeItCloseQuietly(connection.createStatement());
+ SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+
+ if (SQLModel.TABLE_MODEL_SQL.equals(model)) {
+ // Insert data in table model
+ TableUtils.insertData();
+ } else {
+ // Insert data in tree model
+ ConfigNodeTestUtils.insertTreeModelData(statement);
+ }
+
+ Map<Integer, Set<Integer>> regionMap = getDataRegionMap(statement);
+ regionMap.forEach(
+ (key, valueSet) -> {
+ LOGGER.info("Key: {}, Value: {}", key, valueSet);
+ if (valueSet.size() != dataReplicateFactor) {
+ Assert.fail();
+ }
+ });
+
+ // Get all data nodes
+ ResultSet result = statement.executeQuery(SHOW_DATANODES);
+ Set<Integer> allDataNodeId = new HashSet<>();
+ while (result.next()) {
+ allDataNodeId.add(result.getInt(ColumnHeaderConstant.NODE_ID));
+ }
+
+ // Select data nodes to remove
+ final Set<Integer> removeDataNodes =
selectRemoveDataNodes(allDataNodeId, removeDataNodeNum);
+
+ List<DataNodeWrapper> removeDataNodeWrappers =
+ removeDataNodes.stream()
+ .map(dataNodeId ->
EnvFactory.getEnv().dataNodeIdToWrapper(dataNodeId).get())
+ .collect(Collectors.toList());
+
+ AtomicReference<SyncConfigNodeIServiceClient> clientRef = new
AtomicReference<>(client);
+ List<TDataNodeLocation> removeDataNodeLocations =
+ clientRef
+ .get()
+ .getDataNodeConfiguration(-1)
+ .getDataNodeConfigurationMap()
+ .values()
+ .stream()
+ .map(TDataNodeConfiguration::getLocation)
+ .filter(location ->
removeDataNodes.contains(location.getDataNodeId()))
+ .collect(Collectors.toList());
+ if (SQLModel.NOT_USE_SQL.equals(model)) {
+ TDataNodeRemoveReq removeReq = new
TDataNodeRemoveReq(removeDataNodeLocations);
+
+ // Remove data nodes
+ TDataNodeRemoveResp removeResp =
clientRef.get().removeDataNode(removeReq);
+ LOGGER.info("Submit Remove DataNodes result {} ", removeResp);
+ if (removeResp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ if (expectRemoveSuccess) {
+ LOGGER.error("Submit Remove DataNodes fail");
+ Assert.fail();
+ } else {
+ LOGGER.info("Submit Remove DataNodes fail, as expected.");
+ return;
+ }
+ }
+ LOGGER.info("Submit Remove DataNodes request: {}", removeReq);
+
+ } else {
+ String removeDataNodeSQL = generateRemoveString(removeDataNodes);
+ LOGGER.info("Remove DataNodes SQL: {}", removeDataNodeSQL);
+ try {
+ statement.execute(removeDataNodeSQL);
+ } catch (IoTDBSQLException e) {
+ if (expectRemoveSuccess) {
+ LOGGER.error("Remove DataNodes SQL execute fail: {}",
e.getMessage());
+ Assert.fail();
+ } else {
+ LOGGER.info("Submit Remove DataNodes fail, as expected");
+ return;
+ }
+ }
+ LOGGER.info("Remove DataNodes SQL submit successfully.");
+ }
+
+ // Wait until success
+ boolean removeSuccess = false;
+ try {
+ awaitUntilSuccess(clientRef, removeDataNodeLocations);
+ removeSuccess = true;
+ } catch (ConditionTimeoutException e) {
+ if (expectRemoveSuccess) {
+ LOGGER.error("Remove DataNodes timeout in 2 minutes");
+ Assert.fail();
+ }
+ }
+
+ if (!expectRemoveSuccess && removeSuccess) {
+ LOGGER.error("Remove DataNodes success, but expect fail");
+ Assert.fail();
+ }
+
+ LOGGER.info("Remove DataNodes success");
+
+ if (rejoinRemovedDataNode) {
+ try {
+ // Use sleep and restart to ensure that removeDataNodes restarts
successfully
+ Thread.sleep(30000);
+ restartDataNodes(removeDataNodeWrappers);
+ LOGGER.info("RemoveDataNodes:{} rejoined successfully.",
removeDataNodes);
+ } catch (Exception e) {
+ LOGGER.error("RemoveDataNodes rejoin failed.");
+ Assert.fail();
+ }
+ }
+ } catch (InconsistentDataException e) {
+ LOGGER.error("Unexpected error:", e);
+ }
+
+ try (final Connection connection =
makeItCloseQuietly(EnvFactory.getEnv().getConnection());
+ final Statement statement =
makeItCloseQuietly(connection.createStatement())) {
+
+ // Check the data region distribution after removing data nodes
+ Map<Integer, Set<Integer>> afterRegionMap = getDataRegionMap(statement);
+ afterRegionMap.forEach(
+ (key, valueSet) -> {
+ LOGGER.info("Key: {}, Value: {}", key, valueSet);
+ if (valueSet.size() != dataReplicateFactor) {
+ Assert.fail();
+ }
+ });
+
+ if (rejoinRemovedDataNode) {
+ ResultSet result = statement.executeQuery(SHOW_DATANODES);
+ Set<Integer> allDataNodeId = new HashSet<>();
+ while (result.next()) {
+ allDataNodeId.add(result.getInt(ColumnHeaderConstant.NODE_ID));
+ }
+ Assert.assertEquals(allDataNodeId.size(), dataNodeNum);
+ }
+ } catch (InconsistentDataException e) {
+ LOGGER.error("Unexpected error:", e);
+ }
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeUtils.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeUtils.java
new file mode 100644
index 00000000000..7728f74bacc
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeUtils.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.removedatanode;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+
+import org.apache.thrift.TException;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionTimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+public class IoTDBRemoveDataNodeUtils {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBRemoveDataNodeUtils.class);
+
+ public static String generateRemoveString(Set<Integer> dataNodes) {
+ StringBuilder sb = new StringBuilder("remove datanode ");
+
+ for (Integer node : dataNodes) {
+ sb.append(node).append(", ");
+ }
+
+ sb.setLength(sb.length() - 2);
+
+ return sb.toString();
+ }
+
+ public static Connection getConnectionWithSQLType(SQLModel model) throws
SQLException {
+ if (SQLModel.TABLE_MODEL_SQL.equals(model)) {
+ return EnvFactory.getEnv().getTableConnection();
+ } else {
+ return EnvFactory.getEnv().getConnection();
+ }
+ }
+
+ public static Set<Integer> selectRemoveDataNodes(
+ Set<Integer> allDataNodeId, int removeDataNodeNum) {
+ Set<Integer> removeDataNodeIds = new HashSet<>();
+ for (int i = 0; i < removeDataNodeNum; i++) {
+ int removeDataNodeId = allDataNodeId.iterator().next();
+ removeDataNodeIds.add(removeDataNodeId);
+ allDataNodeId.remove(removeDataNodeId);
+ }
+ return removeDataNodeIds;
+ }
+
+ public static void restartDataNodes(List<DataNodeWrapper> dataNodeWrappers) {
+ dataNodeWrappers.parallelStream()
+ .forEach(
+ nodeWrapper -> {
+ nodeWrapper.stopForcibly();
+ Awaitility.await()
+ .atMost(1, TimeUnit.MINUTES)
+ .pollDelay(2, TimeUnit.SECONDS)
+ .until(() -> !nodeWrapper.isAlive());
+ LOGGER.info("Node {} stopped.", nodeWrapper.getId());
+ nodeWrapper.start();
+ Awaitility.await()
+ .atMost(1, TimeUnit.MINUTES)
+ .pollDelay(2, TimeUnit.SECONDS)
+ .until(nodeWrapper::isAlive);
+ try {
+ TimeUnit.SECONDS.sleep(10);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ LOGGER.info("Node {} restarted.", nodeWrapper.getId());
+ });
+ }
+
+ public static void stopDataNodes(List<DataNodeWrapper> dataNodeWrappers) {
+ dataNodeWrappers.parallelStream()
+ .forEach(
+ nodeWrapper -> {
+ nodeWrapper.stopForcibly();
+ Awaitility.await()
+ .atMost(1, TimeUnit.MINUTES)
+ .pollDelay(2, TimeUnit.SECONDS)
+ .until(() -> !nodeWrapper.isAlive());
+ LOGGER.info("Node {} stopped.", nodeWrapper.getId());
+ });
+ }
+
+ public static void awaitUntilSuccess(
+ AtomicReference<SyncConfigNodeIServiceClient> clientRef,
+ List<TDataNodeLocation> removeDataNodeLocations) {
+ AtomicReference<List<TDataNodeLocation>> lastTimeDataNodeLocations = new
AtomicReference<>();
+ AtomicReference<Exception> lastException = new AtomicReference<>();
+
+ try {
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .pollDelay(2, TimeUnit.SECONDS)
+ .until(
+ () -> {
+ try {
+ List<TDataNodeLocation> remainingDataNodes =
+ clientRef
+ .get()
+ .getDataNodeConfiguration(-1)
+ .getDataNodeConfigurationMap()
+ .values()
+ .stream()
+ .map(TDataNodeConfiguration::getLocation)
+ .collect(Collectors.toList());
+ lastTimeDataNodeLocations.set(remainingDataNodes);
+ for (TDataNodeLocation location : removeDataNodeLocations) {
+ if (remainingDataNodes.contains(location)) {
+ return false;
+ }
+ }
+ return true;
+ } catch (TException e) {
+ clientRef.set(
+ (SyncConfigNodeIServiceClient)
+ EnvFactory.getEnv().getLeaderConfigNodeConnection());
+ lastException.set(e);
+ return false;
+ } catch (Exception e) {
+ // Any exception can be ignored
+ lastException.set(e);
+ return false;
+ }
+ });
+ } catch (ConditionTimeoutException e) {
+ if (lastTimeDataNodeLocations.get() == null) {
+ LOGGER.error(
+ "Maybe getDataNodeConfiguration fail, lastTimeDataNodeLocations is
null, last Exception:",
+ lastException.get());
+ throw e;
+ }
+ String actualSetStr = lastTimeDataNodeLocations.get().toString();
+ lastTimeDataNodeLocations.get().removeAll(removeDataNodeLocations);
+ String expectedSetStr = lastTimeDataNodeLocations.get().toString();
+ LOGGER.error(
+ "Remove DataNodes timeout in 2 minutes, expected set: {}, actual
set: {}",
+ expectedSetStr,
+ actualSetStr);
+ if (lastException.get() == null) {
+ LOGGER.info("No exception during awaiting");
+ } else {
+ LOGGER.error("Last exception during awaiting:", lastException.get());
+ }
+ throw e;
+ }
+
+ LOGGER.info("DataNodes has been successfully changed to {}",
lastTimeDataNodeLocations.get());
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeITFramework.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveUnknownDataNodeIT.java
similarity index 56%
rename from
integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeITFramework.java
rename to
integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveUnknownDataNodeIT.java
index 583410d80c5..5bf1ebea06a 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeITFramework.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveUnknownDataNodeIT.java
@@ -23,62 +23,68 @@ import
org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
+import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.exception.InconsistentDataException;
import org.apache.iotdb.jdbc.IoTDBSQLException;
import org.apache.iotdb.relational.it.query.old.aligned.TableUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.thrift.TException;
-import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.ResultSet;
-import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getDataRegionMap;
+import static
org.apache.iotdb.confignode.it.removedatanode.IoTDBRemoveDataNodeUtils.awaitUntilSuccess;
+import static
org.apache.iotdb.confignode.it.removedatanode.IoTDBRemoveDataNodeUtils.generateRemoveString;
+import static
org.apache.iotdb.confignode.it.removedatanode.IoTDBRemoveDataNodeUtils.getConnectionWithSQLType;
+import static
org.apache.iotdb.confignode.it.removedatanode.IoTDBRemoveDataNodeUtils.selectRemoveDataNodes;
+import static
org.apache.iotdb.confignode.it.removedatanode.IoTDBRemoveDataNodeUtils.stopDataNodes;
import static org.apache.iotdb.util.MagicUtils.makeItCloseQuietly;
-public class IoTDBRemoveDataNodeITFramework {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(IoTDBRemoveDataNodeITFramework.class);
- private static final String TREE_MODEL_INSERTION =
- "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(100, 1, 2)";
+@Category({ClusterIT.class})
+@RunWith(IoTDBTestRunner.class)
+public class IoTDBRemoveUnknownDataNodeIT {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBRemoveUnknownDataNodeIT.class);
- private static final String SHOW_REGIONS = "show regions";
private static final String SHOW_DATANODES = "show datanodes";
- private static final String defaultSchemaRegionGroupExtensionPolicy =
"CUSTOM";
- private static final String defaultDataRegionGroupExtensionPolicy = "CUSTOM";
+ private static final String DEFAULT_SCHEMA_REGION_GROUP_EXTENSION_POLICY =
"CUSTOM";
+ private static final String DEFAULT_DATA_REGION_GROUP_EXTENSION_POLICY =
"CUSTOM";
@Before
public void setUp() throws Exception {
+ // Setup common environmental configuration
EnvFactory.getEnv()
.getConfig()
.getCommonConfig()
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
- .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
-
.setSchemaRegionGroupExtensionPolicy(defaultSchemaRegionGroupExtensionPolicy)
-
.setDataRegionGroupExtensionPolicy(defaultDataRegionGroupExtensionPolicy);
+
.setSchemaRegionGroupExtensionPolicy(DEFAULT_SCHEMA_REGION_GROUP_EXTENSION_POLICY)
+
.setDataRegionGroupExtensionPolicy(DEFAULT_DATA_REGION_GROUP_EXTENSION_POLICY);
}
@After
@@ -86,15 +92,97 @@ public class IoTDBRemoveDataNodeITFramework {
EnvFactory.getEnv().cleanClusterEnvironment();
}
- public void successTest(
+ // @Test
+ // public void success1C4DIoTTest() throws Exception {
+ // // Setup 1C4D, and remove 1D, this test should success
+ // successTest(2, 3, 1, 4, 1, 2, SQLModel.NOT_USE_SQL,
ConsensusFactory.IOT_CONSENSUS);
+ // }
+ //
+ // @Test
+ // public void success1C4DIoTV2Test() throws Exception {
+ // // Setup 1C4D, and remove 1D, this test should success
+ // successTest(2, 3, 1, 4, 1, 2, SQLModel.NOT_USE_SQL,
ConsensusFactory.IOT_CONSENSUS_V2);
+ // }
+
+ // @Test
+ // public void fail1C3DIoTTest() throws Exception {
+ // // Setup 1C3D with schema replication factor = 3, and remove 1D, this
test should fail due
+ // to
+ // // insufficient DN for holding schema
+ // failTest(2, 3, 1, 3, 1, 2, SQLModel.NOT_USE_SQL,
ConsensusFactory.IOT_CONSENSUS);
+ // }
+ //
+ // @Test
+ // public void fail1C3DIoTV2Test() throws Exception {
+ // // Setup 1C3D with schema replication factor = 3, and remove 1D, this
test should fail due
+ // to
+ // // insufficient DN for holding schema
+ // failTest(2, 3, 1, 3, 1, 2, SQLModel.NOT_USE_SQL,
ConsensusFactory.IOT_CONSENSUS_V2);
+ // }
+
+ // @Test
+ // public void success1C4DIoTTestUseSQL() throws Exception {
+ // // Setup 1C4D, and remove 1D, this test should success
+ // successTest(2, 3, 1, 4, 1, 2, SQLModel.TREE_MODEL_SQL,
ConsensusFactory.IOT_CONSENSUS);
+ // }
+ //
+ // @Test
+ // public void success1C4DIoTV2TestUseSQL() throws Exception {
+ // // Setup 1C4D, and remove 1D, this test should success
+ // successTest(2, 3, 1, 4, 1, 2, SQLModel.TREE_MODEL_SQL,
ConsensusFactory.IOT_CONSENSUS_V2);
+ // }
+
+ @Test
+ public void fail1C3DTestIoTUseSQL() throws Exception {
+ // Setup 1C3D with schema replication factor = 3, and remove 1D, this test
should fail due to
+ // insufficient DN for holding schema
+ failTest(2, 3, 1, 3, 1, 2, SQLModel.TREE_MODEL_SQL,
ConsensusFactory.IOT_CONSENSUS);
+ }
+
+ @Test
+ public void fail1C3DTestIoTV2UseSQL() throws Exception {
+ // Setup 1C3D with schema replication factor = 3, and remove 1D, this test
should fail due to
+ // insufficient DN for holding schema
+ failTest(2, 3, 1, 3, 1, 2, SQLModel.TREE_MODEL_SQL,
ConsensusFactory.IOT_CONSENSUS_V2);
+ }
+
+ @Test
+ public void success1C4DIoTTestUseTableSQL() throws Exception {
+ // Setup 1C4D, and remove 1D, this test should success
+ successTest(2, 3, 1, 4, 1, 2, SQLModel.TABLE_MODEL_SQL,
ConsensusFactory.IOT_CONSENSUS);
+ }
+
+ @Test
+ public void success1C4DIoTV2TestUseTableSQL() throws Exception {
+ // Setup 1C4D, and remove 1D, this test should success
+ successTest(2, 3, 1, 4, 1, 2, SQLModel.TABLE_MODEL_SQL,
ConsensusFactory.IOT_CONSENSUS_V2);
+ }
+
+ // @Test
+ // public void fail1C3DIoTTestUseTableSQL() throws Exception {
+ // // Setup 1C3D with schema replication factor = 3, and remove 1D, this
test should fail due
+ // to
+ // // insufficient DN for holding schema
+ // failTest(2, 3, 1, 3, 1, 2, SQLModel.TABLE_MODEL_SQL,
ConsensusFactory.IOT_CONSENSUS);
+ // }
+ //
+ // @Test
+ // public void fail1C3DIoTV2TestUseTableSQL() throws Exception {
+ // // Setup 1C3D with schema replication factor = 3, and remove 1D, this
test should fail due
+ // to
+ // // insufficient DN for holding schema
+ // failTest(2, 3, 1, 3, 1, 2, SQLModel.TABLE_MODEL_SQL,
ConsensusFactory.IOT_CONSENSUS_V2);
+ // }
+
+ private void successTest(
final int dataReplicateFactor,
final int schemaReplicationFactor,
final int configNodeNum,
final int dataNodeNum,
final int removeDataNodeNum,
final int dataRegionPerDataNode,
- final boolean rejoinRemovedDataNode,
- final SQLModel model)
+ final SQLModel model,
+ final String dataRegionGroupConsensusProtocol)
throws Exception {
testRemoveDataNode(
dataReplicateFactor,
@@ -104,19 +192,19 @@ public class IoTDBRemoveDataNodeITFramework {
removeDataNodeNum,
dataRegionPerDataNode,
true,
- rejoinRemovedDataNode,
- model);
+ model,
+ dataRegionGroupConsensusProtocol);
}
- public void failTest(
+ private void failTest(
final int dataReplicateFactor,
final int schemaReplicationFactor,
final int configNodeNum,
final int dataNodeNum,
final int removeDataNodeNum,
final int dataRegionPerDataNode,
- final boolean rejoinRemovedDataNode,
- final SQLModel model)
+ final SQLModel model,
+ final String dataRegionGroupConsensusProtocol)
throws Exception {
testRemoveDataNode(
dataReplicateFactor,
@@ -126,8 +214,8 @@ public class IoTDBRemoveDataNodeITFramework {
removeDataNodeNum,
dataRegionPerDataNode,
false,
- rejoinRemovedDataNode,
- model);
+ model,
+ dataRegionGroupConsensusProtocol);
}
public void testRemoveDataNode(
@@ -138,13 +226,14 @@ public class IoTDBRemoveDataNodeITFramework {
final int removeDataNodeNum,
final int dataRegionPerDataNode,
final boolean expectRemoveSuccess,
- final boolean rejoinRemovedDataNode,
- final SQLModel model)
+ final SQLModel model,
+ final String dataRegionGroupConsensusProtocol)
throws Exception {
- // Set up the environment
+ // Set up specific environment
EnvFactory.getEnv()
.getConfig()
.getCommonConfig()
+ .setDataRegionConsensusProtocolClass(dataRegionGroupConsensusProtocol)
.setSchemaReplicationFactor(schemaReplicationFactor)
.setDataReplicationFactor(dataReplicateFactor)
.setDefaultDataRegionGroupNumPerDatabase(
@@ -161,7 +250,7 @@ public class IoTDBRemoveDataNodeITFramework {
TableUtils.insertData();
} else {
// Insert data in tree model
- statement.execute(TREE_MODEL_INSERTION);
+ ConfigNodeTestUtils.insertTreeModelData(statement);
}
Map<Integer, Set<Integer>> regionMap = getDataRegionMap(statement);
@@ -181,8 +270,7 @@ public class IoTDBRemoveDataNodeITFramework {
}
// Select data nodes to remove
- final Set<Integer> removeDataNodes =
- selectRemoveDataNodes(allDataNodeId, regionMap, removeDataNodeNum);
+ final Set<Integer> removeDataNodes =
selectRemoveDataNodes(allDataNodeId, removeDataNodeNum);
List<DataNodeWrapper> removeDataNodeWrappers =
removeDataNodes.stream()
@@ -200,6 +288,11 @@ public class IoTDBRemoveDataNodeITFramework {
.map(TDataNodeConfiguration::getLocation)
.filter(location ->
removeDataNodes.contains(location.getDataNodeId()))
.collect(Collectors.toList());
+
+ // Stop DataNodes before removing them
+ stopDataNodes(removeDataNodeWrappers);
+ LOGGER.info("RemoveDataNodes: {} are stopped.", removeDataNodes);
+
if (SQLModel.NOT_USE_SQL.equals(model)) {
TDataNodeRemoveReq removeReq = new
TDataNodeRemoveReq(removeDataNodeLocations);
@@ -252,18 +345,6 @@ public class IoTDBRemoveDataNodeITFramework {
}
LOGGER.info("Remove DataNodes success");
-
- if (rejoinRemovedDataNode) {
- try {
- // Use sleep and restart to ensure that removeDataNodes restarts
successfully
- Thread.sleep(30000);
- restartDataNodes(removeDataNodeWrappers);
- LOGGER.info("RemoveDataNodes:{} rejoined successfully.",
removeDataNodes);
- } catch (Exception e) {
- LOGGER.error("RemoveDataNodes rejoin failed.");
- Assert.fail();
- }
- }
} catch (InconsistentDataException e) {
LOGGER.error("Unexpected error:", e);
}
@@ -280,138 +361,8 @@ public class IoTDBRemoveDataNodeITFramework {
Assert.fail();
}
});
-
- if (rejoinRemovedDataNode) {
- ResultSet result = statement.executeQuery(SHOW_DATANODES);
- Set<Integer> allDataNodeId = new HashSet<>();
- while (result.next()) {
- allDataNodeId.add(result.getInt(ColumnHeaderConstant.NODE_ID));
- }
- Assert.assertEquals(allDataNodeId.size(), dataNodeNum);
- }
} catch (InconsistentDataException e) {
LOGGER.error("Unexpected error:", e);
}
}
-
- private static Set<Integer> selectRemoveDataNodes(
- Set<Integer> allDataNodeId, Map<Integer, Set<Integer>> regionMap, int
removeDataNodeNum) {
- Set<Integer> removeDataNodeIds = new HashSet<>();
- for (int i = 0; i < removeDataNodeNum; i++) {
- int removeDataNodeId = allDataNodeId.iterator().next();
- removeDataNodeIds.add(removeDataNodeId);
- allDataNodeId.remove(removeDataNodeId);
- }
- return removeDataNodeIds;
- }
-
- private static void awaitUntilSuccess(
- AtomicReference<SyncConfigNodeIServiceClient> clientRef,
- List<TDataNodeLocation> removeDataNodeLocations) {
- AtomicReference<List<TDataNodeLocation>> lastTimeDataNodeLocations = new
AtomicReference<>();
- AtomicReference<Exception> lastException = new AtomicReference<>();
-
- try {
- Awaitility.await()
- .atMost(2, TimeUnit.MINUTES)
- .pollDelay(2, TimeUnit.SECONDS)
- .until(
- () -> {
- try {
- List<TDataNodeLocation> remainingDataNodes =
- clientRef
- .get()
- .getDataNodeConfiguration(-1)
- .getDataNodeConfigurationMap()
- .values()
- .stream()
- .map(TDataNodeConfiguration::getLocation)
- .collect(Collectors.toList());
- lastTimeDataNodeLocations.set(remainingDataNodes);
- for (TDataNodeLocation location : removeDataNodeLocations) {
- if (remainingDataNodes.contains(location)) {
- return false;
- }
- }
- return true;
- } catch (TException e) {
- clientRef.set(
- (SyncConfigNodeIServiceClient)
- EnvFactory.getEnv().getLeaderConfigNodeConnection());
- lastException.set(e);
- return false;
- } catch (Exception e) {
- // Any exception can be ignored
- lastException.set(e);
- return false;
- }
- });
- } catch (ConditionTimeoutException e) {
- if (lastTimeDataNodeLocations.get() == null) {
- LOGGER.error(
- "Maybe getDataNodeConfiguration fail, lastTimeDataNodeLocations is
null, last Exception:",
- lastException.get());
- throw e;
- }
- String actualSetStr = lastTimeDataNodeLocations.get().toString();
- lastTimeDataNodeLocations.get().removeAll(removeDataNodeLocations);
- String expectedSetStr = lastTimeDataNodeLocations.get().toString();
- LOGGER.error(
- "Remove DataNodes timeout in 2 minutes, expected set: {}, actual
set: {}",
- expectedSetStr,
- actualSetStr);
- if (lastException.get() == null) {
- LOGGER.info("No exception during awaiting");
- } else {
- LOGGER.error("Last exception during awaiting:", lastException.get());
- }
- throw e;
- }
-
- LOGGER.info("DataNodes has been successfully changed to {}",
lastTimeDataNodeLocations.get());
- }
-
- public void restartDataNodes(List<DataNodeWrapper> dataNodeWrappers) {
- dataNodeWrappers.parallelStream()
- .forEach(
- nodeWrapper -> {
- nodeWrapper.stopForcibly();
- Awaitility.await()
- .atMost(1, TimeUnit.MINUTES)
- .pollDelay(2, TimeUnit.SECONDS)
- .until(() -> !nodeWrapper.isAlive());
- LOGGER.info("Node {} stopped.", nodeWrapper.getId());
- nodeWrapper.start();
- Awaitility.await()
- .atMost(1, TimeUnit.MINUTES)
- .pollDelay(2, TimeUnit.SECONDS)
- .until(nodeWrapper::isAlive);
- try {
- TimeUnit.SECONDS.sleep(10);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- LOGGER.info("Node {} restarted.", nodeWrapper.getId());
- });
- }
-
- public static String generateRemoveString(Set<Integer> dataNodes) {
- StringBuilder sb = new StringBuilder("remove datanode ");
-
- for (Integer node : dataNodes) {
- sb.append(node).append(", ");
- }
-
- sb.setLength(sb.length() - 2);
-
- return sb.toString();
- }
-
- public static Connection getConnectionWithSQLType(SQLModel model) throws
SQLException {
- if (SQLModel.TABLE_MODEL_SQL.equals(model)) {
- return EnvFactory.getEnv().getTableConnection();
- } else {
- return EnvFactory.getEnv().getConnection();
- }
- }
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
index 5f9da5ac36a..dfc37e52526 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
@@ -49,6 +49,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -371,4 +373,14 @@ public class ConfigNodeTestUtils {
return new TDataNodeRestartReq(
clusterName, generateTDataNodeConfiguration(nodeId, dataNodeWrapper));
}
+
+ public static void insertTreeModelData(Statement statement) throws
SQLException {
+ for (int i = 0; i < 1024; i++) {
+ statement.addBatch(
+ String.format(
+ "INSERT INTO root.sg.d%d(timestamp,speed,temperature) values(%d,
%d, %d)",
+ i, i, i, i));
+ }
+ statement.executeBatch();
+ }
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
index 5fdd3a02a9e..2402d285a6e 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
@@ -581,8 +581,7 @@ public class PipeConsensusServerImpl {
}
private boolean isRemotePeerConsensusPipesTransmissionCompleted(
- Peer targetPeer, List<String> consensusPipeNames, boolean
refreshCachedProgressIndex)
- throws ConsensusGroupModifyPeerException {
+ Peer targetPeer, List<String> consensusPipeNames, boolean
refreshCachedProgressIndex) {
try (SyncPipeConsensusServiceClient client =
syncClientManager.borrowClient(targetPeer.getEndpoint())) {
TCheckConsensusPipeCompletedResp resp =
@@ -603,8 +602,7 @@ public class PipeConsensusServerImpl {
return resp.isCompleted;
} catch (Exception e) {
LOGGER.warn("{} cannot check consensus pipes transmission completed",
thisNode, e);
- throw new ConsensusGroupModifyPeerException(
- String.format("%s cannot check consensus pipes transmission
completed", thisNode), e);
+ return true;
}
}