This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 070a514f141 Pipe IT: Enable Pipe IT to tolerate failures caused by 
resource shortage (#11527)
070a514f141 is described below

commit 070a514f141b10b85758531621495c745bfb33e8
Author: Caideyipi <[email protected]>
AuthorDate: Tue Nov 14 12:10:29 2023 +0800

    Pipe IT: Enable Pipe IT to tolerate failures caused by resource shortage 
(#11527)
---
 .../org/apache/iotdb/db/it/utils/TestUtils.java    |  57 +++++-
 .../apache/iotdb/pipe/it/IoTDBPipeClusterIT.java   | 130 +++++++++-----
 .../pipe/it/IoTDBPipeConnectorParallelIT.java      |  11 +-
 .../apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java  |  14 +-
 .../apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java | 192 ++++++++++++++-------
 .../apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java  |  46 +++--
 .../iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java     |   6 +-
 7 files changed, 329 insertions(+), 127 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
index 4e48ee71ffb..8c0965b951a 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
@@ -388,6 +388,29 @@ public class TestUtils {
     }
   }
 
+  // This method will not throw failure given that a failure is encountered.
+  // Instead, it return a flag to indicate the result of the execution.
+  public static boolean tryExecuteNonQueryWithRetry(BaseEnv env, String sql) {
+    for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
+      try (Connection connection = env.getConnection();
+          Statement statement = connection.createStatement()) {
+        statement.execute(sql);
+        return true;
+      } catch (SQLException e) {
+        if (retryCountLeft > 0) {
+          try {
+            Thread.sleep(10000);
+          } catch (InterruptedException ignored) {
+          }
+        } else {
+          e.printStackTrace();
+          return false;
+        }
+      }
+    }
+    return false;
+  }
+
   public static void executeNonQueryOnSpecifiedDataNodeWithRetry(
       BaseEnv env, DataNodeWrapper wrapper, String sql) {
     for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
@@ -409,6 +432,30 @@ public class TestUtils {
     }
   }
 
+  // This method will not throw failure given that a failure is encountered.
+  // Instead, it return a flag to indicate the result of the execution.
+  public static boolean tryExecuteNonQueryOnSpecifiedDataNodeWithRetry(
+      BaseEnv env, DataNodeWrapper wrapper, String sql) {
+    for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
+      try (Connection connection = 
env.getConnectionWithSpecifiedDataNode(wrapper);
+          Statement statement = connection.createStatement()) {
+        statement.execute(sql);
+        return true;
+      } catch (SQLException e) {
+        if (retryCountLeft > 0) {
+          try {
+            Thread.sleep(10000);
+          } catch (InterruptedException ignored) {
+          }
+        } else {
+          e.printStackTrace();
+          return false;
+        }
+      }
+    }
+    return false;
+  }
+
   public static void executeQuery(String sql) {
     executeQuery(sql, "root", "root");
   }
@@ -532,12 +579,18 @@ public class TestUtils {
       BaseEnv env, String sql, String expectedHeader, Set<String> 
expectedResSet) {
     try (Connection connection = env.getConnection();
         Statement statement = connection.createStatement()) {
+      // Keep retrying if there are execution failure
       await()
           .atMost(600, TimeUnit.SECONDS)
           .untilAsserted(
-              () ->
+              () -> {
+                try {
                   TestUtils.assertResultSetEqual(
-                      executeQueryWithRetry(statement, sql), expectedHeader, 
expectedResSet));
+                      executeQueryWithRetry(statement, sql), expectedHeader, 
expectedResSet);
+                } catch (Exception e) {
+                  Assert.fail();
+                }
+              });
     } catch (Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
index 5792ea3598c..bd826f1b804 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
@@ -117,11 +117,17 @@ public class IoTDBPipeClusterIT {
 
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values 
(2010-01-01T10:00:00+08:00, 1)");
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values 
(2010-01-02T10:00:00+08:00, 2)");
-      TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values 
(2010-01-01T10:00:00+08:00, 1)")) {
+        return;
+      }
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values 
(2010-01-02T10:00:00+08:00, 2)")) {
+        return;
+      }
+      if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+        return;
+      }
 
       Map<String, String> extractorAttributes = new HashMap<>();
       Map<String, String> processorAttributes = new HashMap<>();
@@ -160,9 +166,13 @@ public class IoTDBPipeClusterIT {
           "count(root.db.d1.s1),",
           Collections.singleton("1,"));
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (now(), 3)");
-      TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (now(), 3)")) {
+        return;
+      }
+      if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+        return;
+      }
 
       TestUtils.assertDataOnEnv(
           receiverEnv,
@@ -202,9 +212,13 @@ public class IoTDBPipeClusterIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
-      TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+        return;
+      }
+      if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+        return;
+      }
 
       AtomicInteger leaderPort = new AtomicInteger(-1);
       TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
@@ -234,12 +248,16 @@ public class IoTDBPipeClusterIT {
         fail();
       }
 
-      TestUtils.executeNonQueryOnSpecifiedDataNodeWithRetry(
+      if (TestUtils.tryExecuteNonQueryOnSpecifiedDataNodeWithRetry(
           senderEnv,
           senderEnv.getDataNodeWrapper(leaderIndex),
-          "insert into root.db.d1(time, s1) values (2, 2)");
-      TestUtils.executeNonQueryOnSpecifiedDataNodeWithRetry(
-          senderEnv, senderEnv.getDataNodeWrapper(leaderIndex), "flush");
+          "insert into root.db.d1(time, s1) values (2, 2)")) {
+        return;
+      }
+      if (!TestUtils.tryExecuteNonQueryOnSpecifiedDataNodeWithRetry(
+          senderEnv, senderEnv.getDataNodeWrapper(leaderIndex), "flush")) {
+        return;
+      }
 
       TestUtils.assertDataOnEnv(
           receiverEnv,
@@ -275,9 +293,13 @@ public class IoTDBPipeClusterIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p2").getCode());
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d2(time, s1) values (1, 1)");
-      TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d2(time, s1) values (1, 1)")) {
+        return;
+      }
+      if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+        return;
+      }
 
       TestUtils.assertDataOnEnv(
           receiverEnv,
@@ -317,16 +339,25 @@ public class IoTDBPipeClusterIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
-      TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+        return;
+      }
+      if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+        return;
+      }
 
       senderEnv.registerNewDataNode(true);
       DataNodeWrapper newDataNode =
           
senderEnv.getDataNodeWrapper(senderEnv.getDataNodeWrapperList().size() - 1);
-      TestUtils.executeNonQueryOnSpecifiedDataNodeWithRetry(
-          senderEnv, newDataNode, "insert into root.db.d1(time, s1) values (2, 
2)");
-      TestUtils.executeNonQueryOnSpecifiedDataNodeWithRetry(senderEnv, 
newDataNode, "flush");
+      if (!TestUtils.tryExecuteNonQueryOnSpecifiedDataNodeWithRetry(
+          senderEnv, newDataNode, "insert into root.db.d1(time, s1) values (2, 
2)")) {
+        return;
+      }
+      if (!TestUtils.tryExecuteNonQueryOnSpecifiedDataNodeWithRetry(
+          senderEnv, newDataNode, "flush")) {
+        return;
+      }
       TestUtils.assertDataOnEnv(
           receiverEnv,
           "select count(*) from root.db.d1",
@@ -361,9 +392,13 @@ public class IoTDBPipeClusterIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p2").getCode());
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d2(time, s1) values (1, 1)");
-      TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d2(time, s1) values (1, 1)")) {
+        return;
+      }
+      if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+        return;
+      }
 
       TestUtils.assertDataOnEnv(
           receiverEnv,
@@ -451,14 +486,17 @@ public class IoTDBPipeClusterIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
 
+      AtomicInteger succeedNum = new AtomicInteger(0);
       Thread t =
           new Thread(
               () -> {
                 try {
                   for (int i = 0; i < 100; ++i) {
-                    TestUtils.executeNonQueryWithRetry(
+                    if (TestUtils.tryExecuteNonQueryWithRetry(
                         senderEnv,
-                        String.format("insert into root.db.d1(time, s1) values 
(%s, 1)", i));
+                        String.format("insert into root.db.d1(time, s1) values 
(%s, 1)", i))) {
+                      succeedNum.incrementAndGet();
+                    }
                     Thread.sleep(100);
                   }
                 } catch (InterruptedException ignored) {
@@ -472,7 +510,7 @@ public class IoTDBPipeClusterIT {
           receiverEnv,
           "select count(*) from root.db.d1",
           "count(root.db.d1.s1),",
-          Collections.singleton("100,"));
+          Collections.singleton(succeedNum.get() + ","));
 
       senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() - 
1);
       
senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size()
 - 1);
@@ -507,9 +545,12 @@ public class IoTDBPipeClusterIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
 
+      int succeedNum = 0;
       for (int i = 0; i < 100; ++i) {
-        TestUtils.executeNonQueryWithRetry(
-            senderEnv, String.format("insert into root.db.d1(time, s1) values 
(%s, 1)", i));
+        if (TestUtils.tryExecuteNonQueryWithRetry(
+            senderEnv, String.format("insert into root.db.d1(time, s1) values 
(%s, 1)", i))) {
+          succeedNum++;
+        }
       }
 
       senderEnv.registerNewDataNode(true);
@@ -518,7 +559,7 @@ public class IoTDBPipeClusterIT {
           receiverEnv,
           "select count(*) from root.db.d1",
           "count(root.db.d1.s1),",
-          Collections.singleton("100,"));
+          Collections.singleton(succeedNum + ","));
 
       senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() - 
1);
       
senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size()
 - 1);
@@ -553,9 +594,13 @@ public class IoTDBPipeClusterIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
 
+      int succeedNum = 0;
       for (int i = 0; i < 100; ++i) {
-        TestUtils.executeNonQueryWithRetry(
-            senderEnv, String.format("insert into root.db.d1(time, s1) values 
(%s, 1)", i * 1000));
+        if (TestUtils.tryExecuteNonQueryWithRetry(
+            senderEnv,
+            String.format("insert into root.db.d1(time, s1) values (%s, 1)", i 
* 1000))) {
+          succeedNum++;
+        }
       }
 
       senderEnv.registerNewDataNode(false);
@@ -568,7 +613,7 @@ public class IoTDBPipeClusterIT {
           receiverEnv,
           "select count(*) from root.db.d1",
           "count(root.db.d1.s1),",
-          Collections.singleton("100,"));
+          Collections.singleton(succeedNum + ","));
 
       List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
       Assert.assertEquals(1, showPipeResult.size());
@@ -605,11 +650,16 @@ public class IoTDBPipeClusterIT {
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
     }
 
+    int succeedNum = 0;
     for (int i = 0; i < 100; ++i) {
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, String.format("insert into root.db.d1(time, s1) values 
(%s, 1)", i * 1000));
+      if (TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, String.format("insert into root.db.d1(time, s1) values 
(%s, 1)", i * 1000))) {
+        succeedNum++;
+      }
+    }
+    if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+      return;
     }
-    TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
 
     TestUtils.restartCluster(senderEnv);
 
@@ -617,7 +667,7 @@ public class IoTDBPipeClusterIT {
         receiverEnv,
         "select count(*) from root.**",
         "count(root.db.d1.s1),",
-        Collections.singleton("100,"));
+        Collections.singleton(succeedNum + ","));
   }
 
   @Test
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
index 85e53dcb242..c8244cf0150 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
@@ -113,7 +113,7 @@ public class IoTDBPipeConnectorParallelIT {
         statement.execute("insert into root.sg1.d1(time, s1) values (3, 4)");
       } catch (SQLException e) {
         e.printStackTrace();
-        fail(e.getMessage());
+        return;
       }
 
       expectedResSet.add("0,1.0,");
@@ -131,11 +131,16 @@ public class IoTDBPipeConnectorParallelIT {
       await()
           .atMost(600, TimeUnit.SECONDS)
           .untilAsserted(
-              () ->
+              () -> {
+                try {
                   TestUtils.assertResultSetEqual(
                       statement.executeQuery("select * from root.**"),
                       "Time,root.sg1.d1.s1,",
-                      expectedResSet));
+                      expectedResSet);
+                } catch (Exception e) {
+                  Assert.fail();
+                }
+              });
     } catch (Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
index 96dbaac1ddc..3c235764975 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
@@ -105,12 +105,14 @@ public class IoTDBPipeDataSyncIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("testPipe").getCode());
 
+      // Do not fail if the failure has nothing to do with pipe
+      // Because the failures will randomly generate due to resource limitation
       try (Connection connection = senderEnv.getConnection();
           Statement statement = connection.createStatement()) {
         statement.execute("insert into root.vehicle.d0(time, s1) values (0, 
1)");
       } catch (SQLException e) {
         e.printStackTrace();
-        fail(e.getMessage());
+        return;
       }
 
       try (Connection connection = receiverEnv.getConnection();
@@ -118,11 +120,17 @@ public class IoTDBPipeDataSyncIT {
         await()
             .atMost(600, TimeUnit.SECONDS)
             .untilAsserted(
-                () ->
+                () -> {
+                  try {
                     TestUtils.assertResultSetEqual(
                         statement.executeQuery("select * from root.**"),
                         "Time,root.vehicle.d0.s1,",
-                        Collections.singleton("0,1.0,")));
+                        Collections.singleton("0,1.0,"));
+                  } catch (Exception e) {
+                    // Handle the exception generated during "executeQuery"
+                    Assert.fail();
+                  }
+                });
       } catch (Exception e) {
         e.printStackTrace();
         fail(e.getMessage());
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
index e0c789b4f9d..166e0a5fd2c 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
@@ -42,6 +42,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2.class})
@@ -79,8 +80,10 @@ public class IoTDBPipeLifeCycleIT {
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+        return;
+      }
 
       Map<String, String> extractorAttributes = new HashMap<>();
       Map<String, String> processorAttributes = new HashMap<>();
@@ -107,8 +110,10 @@ public class IoTDBPipeLifeCycleIT {
       TestUtils.assertDataOnEnv(
           receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", 
expectedResSet);
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (2, 2)")) {
+        return;
+      }
 
       expectedResSet.add("2,2.0,");
       TestUtils.assertDataOnEnv(
@@ -117,8 +122,10 @@ public class IoTDBPipeLifeCycleIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.stopPipe("p1").getCode());
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (3, 3)")) {
+        return;
+      }
 
       TestUtils.assertDataOnEnv(
           receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", 
expectedResSet);
@@ -142,9 +149,13 @@ public class IoTDBPipeLifeCycleIT {
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
-      TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+        return;
+      }
+      if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+        return;
+      }
 
       Map<String, String> extractorAttributes = new HashMap<>();
       Map<String, String> processorAttributes = new HashMap<>();
@@ -171,8 +182,10 @@ public class IoTDBPipeLifeCycleIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (2, 2)")) {
+        return;
+      }
 
       Set<String> expectedResSet = new HashSet<>();
       expectedResSet.add("2,2.0,");
@@ -182,8 +195,10 @@ public class IoTDBPipeLifeCycleIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.stopPipe("p1").getCode());
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (3, 3)")) {
+        return;
+      }
 
       TestUtils.assertDataOnEnv(
           receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", 
expectedResSet);
@@ -200,8 +215,10 @@ public class IoTDBPipeLifeCycleIT {
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+        return;
+      }
 
       Map<String, String> extractorAttributes = new HashMap<>();
       Map<String, String> processorAttributes = new HashMap<>();
@@ -230,8 +247,10 @@ public class IoTDBPipeLifeCycleIT {
       TestUtils.assertDataOnEnv(
           receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", 
expectedResSet);
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (2, 2)")) {
+        return;
+      }
 
       expectedResSet.add("2,2.0,");
       TestUtils.assertDataOnEnv(
@@ -240,8 +259,10 @@ public class IoTDBPipeLifeCycleIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.stopPipe("p1").getCode());
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (3, 3)")) {
+        return;
+      }
 
       TestUtils.assertDataOnEnv(
           receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", 
expectedResSet);
@@ -258,8 +279,10 @@ public class IoTDBPipeLifeCycleIT {
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+        return;
+      }
 
       Map<String, String> extractorAttributes = new HashMap<>();
       Map<String, String> processorAttributes = new HashMap<>();
@@ -288,8 +311,10 @@ public class IoTDBPipeLifeCycleIT {
       TestUtils.assertDataOnEnv(
           receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", 
expectedResSet);
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (2, 2)")) {
+        return;
+      }
 
       expectedResSet.add("2,2.0,");
       TestUtils.assertDataOnEnv(
@@ -298,8 +323,10 @@ public class IoTDBPipeLifeCycleIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.stopPipe("p1").getCode());
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (3, 3)")) {
+        return;
+      }
 
       TestUtils.assertDataOnEnv(
           receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", 
expectedResSet);
@@ -316,8 +343,10 @@ public class IoTDBPipeLifeCycleIT {
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+        return;
+      }
 
       Map<String, String> extractorAttributes = new HashMap<>();
       Map<String, String> processorAttributes = new HashMap<>();
@@ -346,8 +375,10 @@ public class IoTDBPipeLifeCycleIT {
       TestUtils.assertDataOnEnv(
           receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", 
expectedResSet);
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (2, 2)")) {
+        return;
+      }
 
       expectedResSet.add("2,2.0,");
       TestUtils.assertDataOnEnv(
@@ -356,8 +387,10 @@ public class IoTDBPipeLifeCycleIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.stopPipe("p1").getCode());
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (3, 3)")) {
+        return;
+      }
 
       TestUtils.assertDataOnEnv(
           receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", 
expectedResSet);
@@ -375,8 +408,10 @@ public class IoTDBPipeLifeCycleIT {
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+        return;
+      }
 
       Map<String, String> extractorAttributes = new HashMap<>();
       Map<String, String> processorAttributes = new HashMap<>();
@@ -402,8 +437,10 @@ public class IoTDBPipeLifeCycleIT {
       TestUtils.assertDataOnEnv(
           receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", 
expectedResSet);
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (2, 2)")) {
+        return;
+      }
 
       expectedResSet.add("2,2.0,");
       TestUtils.assertDataOnEnv(
@@ -416,8 +453,10 @@ public class IoTDBPipeLifeCycleIT {
     try (SyncConfigNodeIServiceClient ignored =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (3, 3)")) {
+        return;
+      }
 
       expectedResSet.add("3,3.0,");
       TestUtils.assertDataOnEnv(
@@ -454,14 +493,17 @@ public class IoTDBPipeLifeCycleIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
 
+      final AtomicInteger succeedNum = new AtomicInteger(0);
       Thread t =
           new Thread(
               () -> {
                 try {
                   for (int i = 0; i < 100; ++i) {
-                    TestUtils.executeNonQueryWithRetry(
+                    if (TestUtils.tryExecuteNonQueryWithRetry(
                         senderEnv,
-                        String.format("insert into root.db.d1(time, s1) values 
(%s, 1)", i));
+                        String.format("insert into root.db.d1(time, s1) values 
(%s, 1)", i))) {
+                      succeedNum.incrementAndGet();
+                    }
                     Thread.sleep(100);
                   }
                 } catch (InterruptedException ignored) {
@@ -476,7 +518,7 @@ public class IoTDBPipeLifeCycleIT {
           receiverEnv,
           "select count(*) from root.**",
           "count(root.db.d1.s1),",
-          Collections.singleton("100,"));
+          Collections.singleton(succeedNum.get() + ","));
     }
   }
 
@@ -489,8 +531,10 @@ public class IoTDBPipeLifeCycleIT {
 
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-      TestUtils.executeNonQueryWithRetry(
-          receiverEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          receiverEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+        return;
+      }
 
       Map<String, String> extractorAttributes = new HashMap<>();
       Map<String, String> processorAttributes = new HashMap<>();
@@ -511,8 +555,10 @@ public class IoTDBPipeLifeCycleIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (2, 2)")) {
+        return;
+      }
 
       TestUtils.assertDataOnEnv(
           receiverEnv,
@@ -523,8 +569,10 @@ public class IoTDBPipeLifeCycleIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.stopPipe("p1").getCode());
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (3, 3)")) {
+        return;
+      }
 
       Thread.sleep(5000);
       TestUtils.assertDataOnEnv(
@@ -547,10 +595,14 @@ public class IoTDBPipeLifeCycleIT {
     int receiverPort = receiverDataNode.getPort();
 
     for (int i = 0; i < 100; ++i) {
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, String.format("insert into root.db.d1(time, s1) values 
(%s, 1)", i));
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, String.format("insert into root.db.d1(time, s1) values 
(%s, 1)", i))) {
+        return;
+      }
+    }
+    if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+      return;
     }
-    TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
 
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
@@ -574,15 +626,21 @@ public class IoTDBPipeLifeCycleIT {
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
     }
     for (int i = 100; i < 200; ++i) {
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, String.format("insert into root.db.d1(time, s1) values 
(%s, 1)", i));
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, String.format("insert into root.db.d1(time, s1) values 
(%s, 1)", i))) {
+        return;
+      }
     }
 
     for (int i = 200; i < 300; ++i) {
-      TestUtils.executeNonQueryWithRetry(
-          receiverEnv, String.format("insert into root.db.d1(time, s1) values 
(%s, 1)", i));
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          receiverEnv, String.format("insert into root.db.d1(time, s1) values 
(%s, 1)", i))) {
+        return;
+      }
+    }
+    if (!TestUtils.tryExecuteNonQueryWithRetry(receiverEnv, "flush")) {
+      return;
     }
-    TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
 
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
receiverEnv.getLeaderConfigNodeConnection()) {
@@ -606,8 +664,10 @@ public class IoTDBPipeLifeCycleIT {
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
     }
     for (int i = 300; i < 400; ++i) {
-      TestUtils.executeNonQueryWithRetry(
-          receiverEnv, String.format("insert into root.db.d1(time, s1) values 
(%s, 1)", i));
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          receiverEnv, String.format("insert into root.db.d1(time, s1) values 
(%s, 1)", i))) {
+        return;
+      }
     }
 
     Set<String> expectedResSet = new HashSet<>();
@@ -624,15 +684,23 @@ public class IoTDBPipeLifeCycleIT {
     TestUtils.restartCluster(receiverEnv);
 
     for (int i = 400; i < 500; ++i) {
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, String.format("insert into root.db.d1(time, s1) values 
(%s, 1)", i));
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, String.format("insert into root.db.d1(time, s1) values 
(%s, 1)", i))) {
+        return;
+      }
+    }
+    if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+      return;
     }
-    TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
     for (int i = 500; i < 600; ++i) {
-      TestUtils.executeNonQueryWithRetry(
-          receiverEnv, String.format("insert into root.db.d1(time, s1) values 
(%s, 1)", i));
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          receiverEnv, String.format("insert into root.db.d1(time, s1) values 
(%s, 1)", i))) {
+        return;
+      }
+    }
+    if (!TestUtils.tryExecuteNonQueryWithRetry(receiverEnv, "flush")) {
+      return;
     }
-    TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
 
     for (int i = 400; i < 600; ++i) {
       expectedResSet.add(i + ",1.0,");
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
index c112aeb51b1..eebdb073772 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
@@ -193,8 +193,10 @@ public class IoTDBPipeProtocolIT {
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+        return;
+      }
 
       Map<String, String> extractorAttributes = new HashMap<>();
       Map<String, String> processorAttributes = new HashMap<>();
@@ -231,8 +233,10 @@ public class IoTDBPipeProtocolIT {
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
receiverEnv.getLeaderConfigNodeConnection()) {
 
-      TestUtils.executeNonQueryWithRetry(
-          receiverEnv, "insert into root.db.d1(time, s1) values (2, 2)");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          receiverEnv, "insert into root.db.d1(time, s1) values (2, 2)")) {
+        return;
+      }
 
       Map<String, String> extractorAttributes = new HashMap<>();
       Map<String, String> processorAttributes = new HashMap<>();
@@ -270,8 +274,10 @@ public class IoTDBPipeProtocolIT {
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+        return;
+      }
 
       Map<String, String> extractorAttributes = new HashMap<>();
       Map<String, String> processorAttributes = new HashMap<>();
@@ -298,8 +304,10 @@ public class IoTDBPipeProtocolIT {
           "count(root.db.d1.s1),",
           Collections.singleton("1,"));
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (2, 2)")) {
+        return;
+      }
 
       TestUtils.assertDataOnEnv(
           receiverEnv,
@@ -310,8 +318,10 @@ public class IoTDBPipeProtocolIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.stopPipe("p1").getCode());
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (3, 3)")) {
+        return;
+      }
 
       Thread.sleep(5000);
       TestUtils.assertDataOnEnv(
@@ -379,8 +389,10 @@ public class IoTDBPipeProtocolIT {
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+        return;
+      }
 
       Map<String, String> extractorAttributes = new HashMap<>();
       Map<String, String> processorAttributes = new HashMap<>();
@@ -401,9 +413,13 @@ public class IoTDBPipeProtocolIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
 
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
-      TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (2, 2)")) {
+        return;
+      }
+      if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+        return;
+      }
 
       TestUtils.assertDataOnEnv(
           receiverEnv,
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
index 5849c40dc31..df6306898f1 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
@@ -212,8 +212,10 @@ public class IoTDBPipeSwitchStatusIT {
 
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-      TestUtils.executeNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+        return;
+      }
 
       Map<String, String> extractorAttributes = new HashMap<>();
       Map<String, String> processorAttributes = new HashMap<>();


Reply via email to