This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f5d2cd82177 Pipe IT: Stabilize leader stop test (#17809)
f5d2cd82177 is described below
commit f5d2cd821775de2e80795f6f48a568c0e6ce132e
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 2 18:53:19 2026 +0800
Pipe IT: Stabilize leader stop test (#17809)
* Pipe IT: Stabilize table model leader stop test
* Pipe IT: Stabilize tree model leader stop test
---
.../manual/enhanced/IoTDBPipeClusterIT.java | 28 ++++++++++++++++------
.../auto/enhanced/IoTDBPipeClusterIT.java | 28 ++++++++++++++++------
2 files changed, 42 insertions(+), 14 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
index 244a1ff0c83..27297c06471 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
@@ -313,6 +313,9 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
TableModelUtils.insertData("test", "test", 100, 200, senderEnv);
TableModelUtils.insertData("test1", "test1", 100, 200, senderEnv);
+ // Avoid electing a stale follower after stopping the current test1
leader.
+ flushTableDataRegionReplicasAfterReplicationComplete(
+ senderEnv, Collections.singletonList("test1"));
final int leaderIndex = restartTableDataRegionLeader(client,
"test1");
if (leaderIndex == -1) { // ensure the leader is stopped
@@ -324,7 +327,10 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
TableModelUtils.insertData("test1", "test1", 200, 300, senderEnv);
TableModelUtils.assertData("test", "test", 0, 300, receiverEnv,
handleFailure);
- waitForTableDataRegionReplicationComplete(Arrays.asList("test",
"test1"));
+ flushTableDataRegionReplicasAfterReplicationComplete(
+ senderEnv, Arrays.asList("test", "test1"));
+ flushTableDataRegionReplicasAfterReplicationComplete(
+ receiverEnv, Collections.singletonList("test"));
}
try {
@@ -428,14 +434,22 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
return -1;
}
- private void waitForTableDataRegionReplicationComplete(final List<String>
databases) {
+ private void flushTableDataRegionReplicasAfterReplicationComplete(
+ final BaseEnv env, final List<String> databases) {
+ waitForTableDataRegionReplicationComplete(env, databases);
+ TestUtils.executeNonQueryWithRetry(env, "flush");
+ waitForTableDataRegionReplicationComplete(env, databases);
+ }
+
+ private void waitForTableDataRegionReplicationComplete(
+ final BaseEnv env, final List<String> databases) {
await()
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(2, TimeUnit.MINUTES)
.untilAsserted(
() -> {
try (final SyncConfigNodeIServiceClient client =
- (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ (SyncConfigNodeIServiceClient)
env.getLeaderConfigNodeConnection()) {
final List<TRegionInfo> leaderRegionInfoList =
showTableDataRegionLeaders(databases, client);
Assert.assertFalse(
@@ -444,14 +458,14 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
for (final TRegionInfo regionInfo : leaderRegionInfoList) {
final DataNodeWrapper leaderNode =
- findDataNodeWrapperByPort(regionInfo.getClientRpcPort());
+ findDataNodeWrapperByPort(env,
regionInfo.getClientRpcPort());
final String metricsUrl =
"http://"
+ leaderNode.getIp()
+ ":"
+ leaderNode.getMetricPort()
+ "/metrics";
- final String metricsContent =
senderEnv.getUrlContent(metricsUrl, null);
+ final String metricsContent = env.getUrlContent(metricsUrl,
null);
Assert.assertNotNull(
"Failed to fetch metrics from leader DataNode at " +
metricsUrl,
metricsContent);
@@ -480,8 +494,8 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
return result;
}
- private DataNodeWrapper findDataNodeWrapperByPort(final int port) {
- for (final DataNodeWrapper dataNodeWrapper :
senderEnv.getDataNodeWrapperList()) {
+ private DataNodeWrapper findDataNodeWrapperByPort(final BaseEnv env, final
int port) {
+ for (final DataNodeWrapper dataNodeWrapper : env.getDataNodeWrapperList())
{
if (dataNodeWrapper.getPort() == port) {
return dataNodeWrapper;
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
index 48c2f7a0308..22b6d07f44a 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoEnhanced;
+import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
import
org.apache.iotdb.pipe.it.dual.treemodel.auto.AbstractPipeDualTreeModelAutoIT;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -328,6 +329,8 @@ public class IoTDBPipeClusterIT extends
AbstractPipeDualTreeModelAutoIT {
senderEnv,
Arrays.asList("insert into root.db.d1(time, s1) values (1, 1)",
"flush"),
null);
+ flushTreeDataRegionReplicasAfterReplicationComplete(
+ senderEnv, Collections.singletonList("root.db"));
final int leaderIndex = restartTreeDataRegionLeader(client, "root.db");
if (leaderIndex == -1) { // ensure the leader is stopped
@@ -344,7 +347,10 @@ public class IoTDBPipeClusterIT extends
AbstractPipeDualTreeModelAutoIT {
"select count(*) from root.db.d1",
"count(root.db.d1.s1),",
Collections.singleton("2,"));
-
waitForTreeDataRegionReplicationComplete(Collections.singletonList("root.db"));
+ flushTreeDataRegionReplicasAfterReplicationComplete(
+ senderEnv, Collections.singletonList("root.db"));
+ flushTreeDataRegionReplicasAfterReplicationComplete(
+ receiverEnv, Collections.singletonList("root.db"));
}
try {
@@ -441,14 +447,22 @@ public class IoTDBPipeClusterIT extends
AbstractPipeDualTreeModelAutoIT {
return -1;
}
- private void waitForTreeDataRegionReplicationComplete(final List<String>
databases) {
+ private void flushTreeDataRegionReplicasAfterReplicationComplete(
+ final BaseEnv env, final List<String> databases) {
+ waitForTreeDataRegionReplicationComplete(env, databases);
+ TestUtils.executeNonQueryWithRetry(env, "flush");
+ waitForTreeDataRegionReplicationComplete(env, databases);
+ }
+
+ private void waitForTreeDataRegionReplicationComplete(
+ final BaseEnv env, final List<String> databases) {
await()
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(2, TimeUnit.MINUTES)
.untilAsserted(
() -> {
try (final SyncConfigNodeIServiceClient client =
- (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ (SyncConfigNodeIServiceClient)
env.getLeaderConfigNodeConnection()) {
final List<TRegionInfo> leaderRegionInfoList =
showTreeDataRegionLeaders(databases, client);
Assert.assertFalse(
@@ -457,14 +471,14 @@ public class IoTDBPipeClusterIT extends
AbstractPipeDualTreeModelAutoIT {
for (final TRegionInfo regionInfo : leaderRegionInfoList) {
final DataNodeWrapper leaderNode =
- findDataNodeWrapperByPort(regionInfo.getClientRpcPort());
+ findDataNodeWrapperByPort(env,
regionInfo.getClientRpcPort());
final String metricsUrl =
"http://"
+ leaderNode.getIp()
+ ":"
+ leaderNode.getMetricPort()
+ "/metrics";
- final String metricsContent =
senderEnv.getUrlContent(metricsUrl, null);
+ final String metricsContent = env.getUrlContent(metricsUrl,
null);
Assert.assertNotNull(
"Failed to fetch metrics from leader DataNode at " +
metricsUrl,
metricsContent);
@@ -492,8 +506,8 @@ public class IoTDBPipeClusterIT extends
AbstractPipeDualTreeModelAutoIT {
return result;
}
- private DataNodeWrapper findDataNodeWrapperByPort(final int port) {
- for (final DataNodeWrapper dataNodeWrapper :
senderEnv.getDataNodeWrapperList()) {
+ private DataNodeWrapper findDataNodeWrapperByPort(final BaseEnv env, final
int port) {
+ for (final DataNodeWrapper dataNodeWrapper : env.getDataNodeWrapperList())
{
if (dataNodeWrapper.getPort() == port) {
return dataNodeWrapper;
}