This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 23d1367f6d1 Pipe IT: Stabilize tree model leader stop test (#17825)
23d1367f6d1 is described below

commit 23d1367f6d167916a7529a316a7fcadc1dd960e7
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 3 17:28:26 2026 +0800

    Pipe IT: Stabilize tree model leader stop test (#17825)
---
 .../pipe/it/autocreate/IoTDBPipeClusterIT.java     | 28 ++++++++++++++++------
 1 file changed, 21 insertions(+), 7 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
index 96ec056cd6c..16e21fdb295 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
+import org.apache.iotdb.itbase.env.BaseEnv;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.TException;
@@ -315,6 +316,8 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeDualAutoIT {
 
       TestUtils.executeNonQueries(
           senderEnv, Arrays.asList("insert into root.db.d1(time,s1) values 
(1,1)", "flush"), null);
+      flushTreeDataRegionReplicasAfterReplicationComplete(
+          senderEnv, Collections.singletonList("root.db"));
 
       final int leaderIndex = restartTreeDataRegionLeader(client, "root.db");
       if (leaderIndex == -1) { // ensure the leader is stopped
@@ -333,7 +336,10 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeDualAutoIT {
           "select count(*) from root.db.d1",
           "count(root.db.d1.s1),",
           Collections.singleton("2,"));
-      
waitForTreeDataRegionReplicationComplete(Collections.singletonList("root.db"));
+      flushTreeDataRegionReplicasAfterReplicationComplete(
+          senderEnv, Collections.singletonList("root.db"));
+      flushTreeDataRegionReplicasAfterReplicationComplete(
+          receiverEnv, Collections.singletonList("root.db"));
     }
 
     try {
@@ -427,14 +433,22 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeDualAutoIT {
     return -1;
   }
 
-  private void waitForTreeDataRegionReplicationComplete(final List<String> 
databases) {
+  private void flushTreeDataRegionReplicasAfterReplicationComplete(
+      final BaseEnv env, final List<String> databases) {
+    waitForTreeDataRegionReplicationComplete(env, databases);
+    TestUtils.executeNonQueryWithRetry(env, "flush");
+    waitForTreeDataRegionReplicationComplete(env, databases);
+  }
+
+  private void waitForTreeDataRegionReplicationComplete(
+      final BaseEnv env, final List<String> databases) {
     await()
         .pollInterval(500, TimeUnit.MILLISECONDS)
         .atMost(2, TimeUnit.MINUTES)
         .untilAsserted(
             () -> {
               try (final SyncConfigNodeIServiceClient client =
-                  (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+                  (SyncConfigNodeIServiceClient) 
env.getLeaderConfigNodeConnection()) {
                 final List<TRegionInfo> leaderRegionInfoList =
                     showTreeDataRegionLeaders(databases, client);
                 Assert.assertFalse(
@@ -443,14 +457,14 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeDualAutoIT {
 
                 for (final TRegionInfo regionInfo : leaderRegionInfoList) {
                   final DataNodeWrapper leaderNode =
-                      findDataNodeWrapperByPort(regionInfo.getClientRpcPort());
+                      findDataNodeWrapperByPort(env, 
regionInfo.getClientRpcPort());
                   final String metricsUrl =
                       "http://";
                           + leaderNode.getIp()
                           + ":"
                           + leaderNode.getMetricPort()
                           + "/metrics";
-                  final String metricsContent = 
senderEnv.getUrlContent(metricsUrl, null);
+                  final String metricsContent = env.getUrlContent(metricsUrl, 
null);
                   Assert.assertNotNull(
                       "Failed to fetch metrics from leader DataNode at " + 
metricsUrl,
                       metricsContent);
@@ -478,8 +492,8 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeDualAutoIT {
     return result;
   }
 
-  private DataNodeWrapper findDataNodeWrapperByPort(final int port) {
-    for (final DataNodeWrapper dataNodeWrapper : 
senderEnv.getDataNodeWrapperList()) {
+  private DataNodeWrapper findDataNodeWrapperByPort(final BaseEnv env, final 
int port) {
+    for (final DataNodeWrapper dataNodeWrapper : env.getDataNodeWrapperList()) 
{
       if (dataNodeWrapper.getPort() == port) {
         return dataNodeWrapper;
       }

Reply via email to