This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b7bd5526b3f [Remove DataNode] Increase the waiting time for removing
DN test (#15769)
b7bd5526b3f is described below
commit b7bd5526b3f88a019ab979497d63ffeb836f42e9
Author: Yongzao <[email protected]>
AuthorDate: Thu Jun 19 16:45:22 2025 +0800
[Remove DataNode] Increase the waiting time for removing DN test (#15769)
---
.../removedatanode/IoTDBRemoveDataNodeUtils.java | 16 +++++------
.../IoTDBRemoveUnknownDataNodeIT.java | 31 ++++++++++++----------
.../consensus/pipe/PipeConsensusServerImpl.java | 13 +++++++--
3 files changed, 35 insertions(+), 25 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeUtils.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeUtils.java
index 7728f74bacc..572f2030a4d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeUtils.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeUtils.java
@@ -33,6 +33,8 @@ import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -66,13 +68,9 @@ public class IoTDBRemoveDataNodeUtils {
public static Set<Integer> selectRemoveDataNodes(
Set<Integer> allDataNodeId, int removeDataNodeNum) {
- Set<Integer> removeDataNodeIds = new HashSet<>();
- for (int i = 0; i < removeDataNodeNum; i++) {
- int removeDataNodeId = allDataNodeId.iterator().next();
- removeDataNodeIds.add(removeDataNodeId);
- allDataNodeId.remove(removeDataNodeId);
- }
- return removeDataNodeIds;
+ List<Integer> shuffledDataNodeIds = new ArrayList<>(allDataNodeId);
+ Collections.shuffle(shuffledDataNodeIds);
+ return new HashSet<>(shuffledDataNodeIds.subList(0, removeDataNodeNum));
}
public static void restartDataNodes(List<DataNodeWrapper> dataNodeWrappers) {
@@ -120,7 +118,7 @@ public class IoTDBRemoveDataNodeUtils {
try {
Awaitility.await()
- .atMost(2, TimeUnit.MINUTES)
+ .atMost(5, TimeUnit.MINUTES)
.pollDelay(2, TimeUnit.SECONDS)
.until(
() -> {
@@ -164,7 +162,7 @@ public class IoTDBRemoveDataNodeUtils {
lastTimeDataNodeLocations.get().removeAll(removeDataNodeLocations);
String expectedSetStr = lastTimeDataNodeLocations.get().toString();
LOGGER.error(
- "Remove DataNodes timeout in 2 minutes, expected set: {}, actual
set: {}",
+ "Remove DataNodes timeout in 5 minutes, expected set: {}, actual
set: {}",
expectedSetStr,
actualSetStr);
if (lastException.get() == null) {
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveUnknownDataNodeIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveUnknownDataNodeIT.java
index 5bf1ebea06a..8e2a5a70c4c 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveUnknownDataNodeIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveUnknownDataNodeIT.java
@@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -240,6 +241,9 @@ public class IoTDBRemoveUnknownDataNodeIT {
dataRegionPerDataNode * dataNodeNum / dataReplicateFactor);
EnvFactory.getEnv().initClusterEnvironment(configNodeNum, dataNodeNum);
+ final Set<Integer> removeDataNodes = new HashSet<>();
+ final List<TDataNodeLocation> removeDataNodeLocations = new ArrayList<>();
+
try (final Connection connection =
makeItCloseQuietly(getConnectionWithSQLType(model));
final Statement statement =
makeItCloseQuietly(connection.createStatement());
SyncConfigNodeIServiceClient client =
@@ -269,16 +273,14 @@ public class IoTDBRemoveUnknownDataNodeIT {
allDataNodeId.add(result.getInt(ColumnHeaderConstant.NODE_ID));
}
- // Select data nodes to remove
- final Set<Integer> removeDataNodes =
selectRemoveDataNodes(allDataNodeId, removeDataNodeNum);
-
+ // Randomly select data nodes to remove
+ removeDataNodes.addAll(selectRemoveDataNodes(allDataNodeId,
removeDataNodeNum));
List<DataNodeWrapper> removeDataNodeWrappers =
removeDataNodes.stream()
.map(dataNodeId ->
EnvFactory.getEnv().dataNodeIdToWrapper(dataNodeId).get())
.collect(Collectors.toList());
-
AtomicReference<SyncConfigNodeIServiceClient> clientRef = new
AtomicReference<>(client);
- List<TDataNodeLocation> removeDataNodeLocations =
+ removeDataNodeLocations.addAll(
clientRef
.get()
.getDataNodeConfiguration(-1)
@@ -287,15 +289,22 @@ public class IoTDBRemoveUnknownDataNodeIT {
.stream()
.map(TDataNodeConfiguration::getLocation)
.filter(location ->
removeDataNodes.contains(location.getDataNodeId()))
- .collect(Collectors.toList());
-
+ .collect(Collectors.toList()));
// Stop DataNodes before removing them
stopDataNodes(removeDataNodeWrappers);
LOGGER.info("RemoveDataNodes: {} are stopped.", removeDataNodes);
+ } catch (InconsistentDataException e) {
+ LOGGER.error("Unexpected error:", e);
+ }
+ // Establish a new connection after stopping data nodes
+ try (final Connection connection =
makeItCloseQuietly(EnvFactory.getEnv().getConnection());
+ final Statement statement =
makeItCloseQuietly(connection.createStatement());
+ SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ AtomicReference<SyncConfigNodeIServiceClient> clientRef = new
AtomicReference<>(client);
if (SQLModel.NOT_USE_SQL.equals(model)) {
TDataNodeRemoveReq removeReq = new
TDataNodeRemoveReq(removeDataNodeLocations);
-
// Remove data nodes
TDataNodeRemoveResp removeResp =
clientRef.get().removeDataNode(removeReq);
LOGGER.info("Submit Remove DataNodes result {} ", removeResp);
@@ -345,12 +354,6 @@ public class IoTDBRemoveUnknownDataNodeIT {
}
LOGGER.info("Remove DataNodes success");
- } catch (InconsistentDataException e) {
- LOGGER.error("Unexpected error:", e);
- }
-
- try (final Connection connection =
makeItCloseQuietly(EnvFactory.getEnv().getConnection());
- final Statement statement =
makeItCloseQuietly(connection.createStatement())) {
// Check the data region distribution after removing data nodes
Map<Integer, Set<Integer>> afterRegionMap = getDataRegionMap(statement);
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
index 2402d285a6e..23030b89bd4 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
@@ -384,7 +384,15 @@ public class PipeConsensusServerImpl {
String.format("error when set peer %s to active %s", peer,
isActive), e);
}
} catch (ClientManagerException e) {
- throw new ConsensusGroupModifyPeerException(e);
+ if (isForDeletionPurpose) {
+ // for remove peer, if target peer is already down, we can skip this
step.
+ LOGGER.warn(
+ "target peer may be down, error when set peer {} to active {}",
peer, isActive, e);
+ } else {
+ // for add peer, if target peer is down, we need to throw exception to
identify the failure
+ // of this addPeerProcedure.
+ throw new ConsensusGroupModifyPeerException(e);
+ }
}
}
@@ -653,7 +661,8 @@ public class PipeConsensusServerImpl {
Thread.sleep(checkIntervalInMs);
}
} catch (ClientManagerException | TException e) {
- throw new ConsensusGroupModifyPeerException(
+ // in case of target peer is down or can not serve, we simply skip it.
+ LOGGER.warn(
String.format(
"error when waiting %s to release all region related resource.
%s",
targetPeer, e.getMessage()),