This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b7bd5526b3f [Remove DataNode] Increase the waiting time for removing 
DN test (#15769)
b7bd5526b3f is described below

commit b7bd5526b3f88a019ab979497d63ffeb836f42e9
Author: Yongzao <[email protected]>
AuthorDate: Thu Jun 19 16:45:22 2025 +0800

    [Remove DataNode] Increase the waiting time for removing DN test (#15769)
---
 .../removedatanode/IoTDBRemoveDataNodeUtils.java   | 16 +++++------
 .../IoTDBRemoveUnknownDataNodeIT.java              | 31 ++++++++++++----------
 .../consensus/pipe/PipeConsensusServerImpl.java    | 13 +++++++--
 3 files changed, 35 insertions(+), 25 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeUtils.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeUtils.java
index 7728f74bacc..572f2030a4d 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeUtils.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeUtils.java
@@ -33,6 +33,8 @@ import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
 import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -66,13 +68,9 @@ public class IoTDBRemoveDataNodeUtils {
 
   public static Set<Integer> selectRemoveDataNodes(
       Set<Integer> allDataNodeId, int removeDataNodeNum) {
-    Set<Integer> removeDataNodeIds = new HashSet<>();
-    for (int i = 0; i < removeDataNodeNum; i++) {
-      int removeDataNodeId = allDataNodeId.iterator().next();
-      removeDataNodeIds.add(removeDataNodeId);
-      allDataNodeId.remove(removeDataNodeId);
-    }
-    return removeDataNodeIds;
+    List<Integer> shuffledDataNodeIds = new ArrayList<>(allDataNodeId);
+    Collections.shuffle(shuffledDataNodeIds);
+    return new HashSet<>(shuffledDataNodeIds.subList(0, removeDataNodeNum));
   }
 
   public static void restartDataNodes(List<DataNodeWrapper> dataNodeWrappers) {
@@ -120,7 +118,7 @@ public class IoTDBRemoveDataNodeUtils {
 
     try {
       Awaitility.await()
-          .atMost(2, TimeUnit.MINUTES)
+          .atMost(5, TimeUnit.MINUTES)
           .pollDelay(2, TimeUnit.SECONDS)
           .until(
               () -> {
@@ -164,7 +162,7 @@ public class IoTDBRemoveDataNodeUtils {
       lastTimeDataNodeLocations.get().removeAll(removeDataNodeLocations);
       String expectedSetStr = lastTimeDataNodeLocations.get().toString();
       LOGGER.error(
-          "Remove DataNodes timeout in 2 minutes, expected set: {}, actual 
set: {}",
+          "Remove DataNodes timeout in 5 minutes, expected set: {}, actual 
set: {}",
           expectedSetStr,
           actualSetStr);
       if (lastException.get() == null) {
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveUnknownDataNodeIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveUnknownDataNodeIT.java
index 5bf1ebea06a..8e2a5a70c4c 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveUnknownDataNodeIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveUnknownDataNodeIT.java
@@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -240,6 +241,9 @@ public class IoTDBRemoveUnknownDataNodeIT {
             dataRegionPerDataNode * dataNodeNum / dataReplicateFactor);
     EnvFactory.getEnv().initClusterEnvironment(configNodeNum, dataNodeNum);
 
+    final Set<Integer> removeDataNodes = new HashSet<>();
+    final List<TDataNodeLocation> removeDataNodeLocations = new ArrayList<>();
+
     try (final Connection connection = 
makeItCloseQuietly(getConnectionWithSQLType(model));
         final Statement statement = 
makeItCloseQuietly(connection.createStatement());
         SyncConfigNodeIServiceClient client =
@@ -269,16 +273,14 @@ public class IoTDBRemoveUnknownDataNodeIT {
         allDataNodeId.add(result.getInt(ColumnHeaderConstant.NODE_ID));
       }
 
-      // Select data nodes to remove
-      final Set<Integer> removeDataNodes = 
selectRemoveDataNodes(allDataNodeId, removeDataNodeNum);
-
+      // Randomly select data nodes to remove
+      removeDataNodes.addAll(selectRemoveDataNodes(allDataNodeId, 
removeDataNodeNum));
       List<DataNodeWrapper> removeDataNodeWrappers =
           removeDataNodes.stream()
               .map(dataNodeId -> 
EnvFactory.getEnv().dataNodeIdToWrapper(dataNodeId).get())
               .collect(Collectors.toList());
-
       AtomicReference<SyncConfigNodeIServiceClient> clientRef = new 
AtomicReference<>(client);
-      List<TDataNodeLocation> removeDataNodeLocations =
+      removeDataNodeLocations.addAll(
           clientRef
               .get()
               .getDataNodeConfiguration(-1)
@@ -287,15 +289,22 @@ public class IoTDBRemoveUnknownDataNodeIT {
               .stream()
               .map(TDataNodeConfiguration::getLocation)
               .filter(location -> 
removeDataNodes.contains(location.getDataNodeId()))
-              .collect(Collectors.toList());
-
+              .collect(Collectors.toList()));
       // Stop DataNodes before removing them
       stopDataNodes(removeDataNodeWrappers);
       LOGGER.info("RemoveDataNodes: {} are stopped.", removeDataNodes);
+    } catch (InconsistentDataException e) {
+      LOGGER.error("Unexpected error:", e);
+    }
 
+    // Establish a new connection after stopping data nodes
+    try (final Connection connection = 
makeItCloseQuietly(EnvFactory.getEnv().getConnection());
+        final Statement statement = 
makeItCloseQuietly(connection.createStatement());
+        SyncConfigNodeIServiceClient client =
+            (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+      AtomicReference<SyncConfigNodeIServiceClient> clientRef = new 
AtomicReference<>(client);
       if (SQLModel.NOT_USE_SQL.equals(model)) {
         TDataNodeRemoveReq removeReq = new 
TDataNodeRemoveReq(removeDataNodeLocations);
-
         // Remove data nodes
         TDataNodeRemoveResp removeResp = 
clientRef.get().removeDataNode(removeReq);
         LOGGER.info("Submit Remove DataNodes result {} ", removeResp);
@@ -345,12 +354,6 @@ public class IoTDBRemoveUnknownDataNodeIT {
       }
 
       LOGGER.info("Remove DataNodes success");
-    } catch (InconsistentDataException e) {
-      LOGGER.error("Unexpected error:", e);
-    }
-
-    try (final Connection connection = 
makeItCloseQuietly(EnvFactory.getEnv().getConnection());
-        final Statement statement = 
makeItCloseQuietly(connection.createStatement())) {
 
       // Check the data region distribution after removing data nodes
       Map<Integer, Set<Integer>> afterRegionMap = getDataRegionMap(statement);
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
index 2402d285a6e..23030b89bd4 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
@@ -384,7 +384,15 @@ public class PipeConsensusServerImpl {
             String.format("error when set peer %s to active %s", peer, 
isActive), e);
       }
     } catch (ClientManagerException e) {
-      throw new ConsensusGroupModifyPeerException(e);
+      if (isForDeletionPurpose) {
+        // for remove peer, if target peer is already down, we can skip this 
step.
+        LOGGER.warn(
+            "target peer may be down, error when set peer {} to active {}", 
peer, isActive, e);
+      } else {
+        // for add peer, if target peer is down, we need to throw exception to 
identify the failure
+        // of this addPeerProcedure.
+        throw new ConsensusGroupModifyPeerException(e);
+      }
     }
   }
 
@@ -653,7 +661,8 @@ public class PipeConsensusServerImpl {
         Thread.sleep(checkIntervalInMs);
       }
     } catch (ClientManagerException | TException e) {
-      throw new ConsensusGroupModifyPeerException(
+      // in case of target peer is down or can not serve, we simply skip it.
+      LOGGER.warn(
           String.format(
               "error when waiting %s to release all region related resource. 
%s",
               targetPeer, e.getMessage()),

Reply via email to