This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 070a514f141 Pipe IT: Enable Pipe IT to tolerate failures caused by
resource shortage (#11527)
070a514f141 is described below
commit 070a514f141b10b85758531621495c745bfb33e8
Author: Caideyipi <[email protected]>
AuthorDate: Tue Nov 14 12:10:29 2023 +0800
Pipe IT: Enable Pipe IT to tolerate failures caused by resource shortage
(#11527)
---
.../org/apache/iotdb/db/it/utils/TestUtils.java | 57 +++++-
.../apache/iotdb/pipe/it/IoTDBPipeClusterIT.java | 130 +++++++++-----
.../pipe/it/IoTDBPipeConnectorParallelIT.java | 11 +-
.../apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java | 14 +-
.../apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java | 192 ++++++++++++++-------
.../apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java | 46 +++--
.../iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java | 6 +-
7 files changed, 329 insertions(+), 127 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
index 4e48ee71ffb..8c0965b951a 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
@@ -388,6 +388,29 @@ public class TestUtils {
}
}
+ // This method will not throw failure given that a failure is encountered.
+ // Instead, it return a flag to indicate the result of the execution.
+ public static boolean tryExecuteNonQueryWithRetry(BaseEnv env, String sql) {
+ for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
+ try (Connection connection = env.getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ return true;
+ } catch (SQLException e) {
+ if (retryCountLeft > 0) {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException ignored) {
+ }
+ } else {
+ e.printStackTrace();
+ return false;
+ }
+ }
+ }
+ return false;
+ }
+
public static void executeNonQueryOnSpecifiedDataNodeWithRetry(
BaseEnv env, DataNodeWrapper wrapper, String sql) {
for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
@@ -409,6 +432,30 @@ public class TestUtils {
}
}
+ // This method will not throw failure given that a failure is encountered.
+ // Instead, it return a flag to indicate the result of the execution.
+ public static boolean tryExecuteNonQueryOnSpecifiedDataNodeWithRetry(
+ BaseEnv env, DataNodeWrapper wrapper, String sql) {
+ for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
+ try (Connection connection =
env.getConnectionWithSpecifiedDataNode(wrapper);
+ Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ return true;
+ } catch (SQLException e) {
+ if (retryCountLeft > 0) {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException ignored) {
+ }
+ } else {
+ e.printStackTrace();
+ return false;
+ }
+ }
+ }
+ return false;
+ }
+
public static void executeQuery(String sql) {
executeQuery(sql, "root", "root");
}
@@ -532,12 +579,18 @@ public class TestUtils {
BaseEnv env, String sql, String expectedHeader, Set<String>
expectedResSet) {
try (Connection connection = env.getConnection();
Statement statement = connection.createStatement()) {
+ // Keep retrying if there are execution failure
await()
.atMost(600, TimeUnit.SECONDS)
.untilAsserted(
- () ->
+ () -> {
+ try {
TestUtils.assertResultSetEqual(
- executeQueryWithRetry(statement, sql), expectedHeader,
expectedResSet));
+ executeQueryWithRetry(statement, sql), expectedHeader,
expectedResSet);
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ });
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
index 5792ea3598c..bd826f1b804 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
@@ -117,11 +117,17 @@ public class IoTDBPipeClusterIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values
(2010-01-01T10:00:00+08:00, 1)");
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values
(2010-01-02T10:00:00+08:00, 2)");
- TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values
(2010-01-01T10:00:00+08:00, 1)")) {
+ return;
+ }
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values
(2010-01-02T10:00:00+08:00, 2)")) {
+ return;
+ }
+ if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+ return;
+ }
Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
@@ -160,9 +166,13 @@ public class IoTDBPipeClusterIT {
"count(root.db.d1.s1),",
Collections.singleton("1,"));
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (now(), 3)");
- TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (now(), 3)")) {
+ return;
+ }
+ if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+ return;
+ }
TestUtils.assertDataOnEnv(
receiverEnv,
@@ -202,9 +212,13 @@ public class IoTDBPipeClusterIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
- TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+ return;
+ }
+ if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+ return;
+ }
AtomicInteger leaderPort = new AtomicInteger(-1);
TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
@@ -234,12 +248,16 @@ public class IoTDBPipeClusterIT {
fail();
}
- TestUtils.executeNonQueryOnSpecifiedDataNodeWithRetry(
+ if (TestUtils.tryExecuteNonQueryOnSpecifiedDataNodeWithRetry(
senderEnv,
senderEnv.getDataNodeWrapper(leaderIndex),
- "insert into root.db.d1(time, s1) values (2, 2)");
- TestUtils.executeNonQueryOnSpecifiedDataNodeWithRetry(
- senderEnv, senderEnv.getDataNodeWrapper(leaderIndex), "flush");
+ "insert into root.db.d1(time, s1) values (2, 2)")) {
+ return;
+ }
+ if (!TestUtils.tryExecuteNonQueryOnSpecifiedDataNodeWithRetry(
+ senderEnv, senderEnv.getDataNodeWrapper(leaderIndex), "flush")) {
+ return;
+ }
TestUtils.assertDataOnEnv(
receiverEnv,
@@ -275,9 +293,13 @@ public class IoTDBPipeClusterIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p2").getCode());
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d2(time, s1) values (1, 1)");
- TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d2(time, s1) values (1, 1)")) {
+ return;
+ }
+ if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+ return;
+ }
TestUtils.assertDataOnEnv(
receiverEnv,
@@ -317,16 +339,25 @@ public class IoTDBPipeClusterIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
- TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+ return;
+ }
+ if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+ return;
+ }
senderEnv.registerNewDataNode(true);
DataNodeWrapper newDataNode =
senderEnv.getDataNodeWrapper(senderEnv.getDataNodeWrapperList().size() - 1);
- TestUtils.executeNonQueryOnSpecifiedDataNodeWithRetry(
- senderEnv, newDataNode, "insert into root.db.d1(time, s1) values (2,
2)");
- TestUtils.executeNonQueryOnSpecifiedDataNodeWithRetry(senderEnv,
newDataNode, "flush");
+ if (!TestUtils.tryExecuteNonQueryOnSpecifiedDataNodeWithRetry(
+ senderEnv, newDataNode, "insert into root.db.d1(time, s1) values (2,
2)")) {
+ return;
+ }
+ if (!TestUtils.tryExecuteNonQueryOnSpecifiedDataNodeWithRetry(
+ senderEnv, newDataNode, "flush")) {
+ return;
+ }
TestUtils.assertDataOnEnv(
receiverEnv,
"select count(*) from root.db.d1",
@@ -361,9 +392,13 @@ public class IoTDBPipeClusterIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p2").getCode());
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d2(time, s1) values (1, 1)");
- TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d2(time, s1) values (1, 1)")) {
+ return;
+ }
+ if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+ return;
+ }
TestUtils.assertDataOnEnv(
receiverEnv,
@@ -451,14 +486,17 @@ public class IoTDBPipeClusterIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
+ AtomicInteger succeedNum = new AtomicInteger(0);
Thread t =
new Thread(
() -> {
try {
for (int i = 0; i < 100; ++i) {
- TestUtils.executeNonQueryWithRetry(
+ if (TestUtils.tryExecuteNonQueryWithRetry(
senderEnv,
- String.format("insert into root.db.d1(time, s1) values
(%s, 1)", i));
+ String.format("insert into root.db.d1(time, s1) values
(%s, 1)", i))) {
+ succeedNum.incrementAndGet();
+ }
Thread.sleep(100);
}
} catch (InterruptedException ignored) {
@@ -472,7 +510,7 @@ public class IoTDBPipeClusterIT {
receiverEnv,
"select count(*) from root.db.d1",
"count(root.db.d1.s1),",
- Collections.singleton("100,"));
+ Collections.singleton(succeedNum.get() + ","));
senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() -
1);
senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size()
- 1);
@@ -507,9 +545,12 @@ public class IoTDBPipeClusterIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
+ int succeedNum = 0;
for (int i = 0; i < 100; ++i) {
- TestUtils.executeNonQueryWithRetry(
- senderEnv, String.format("insert into root.db.d1(time, s1) values
(%s, 1)", i));
+ if (TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, String.format("insert into root.db.d1(time, s1) values
(%s, 1)", i))) {
+ succeedNum++;
+ }
}
senderEnv.registerNewDataNode(true);
@@ -518,7 +559,7 @@ public class IoTDBPipeClusterIT {
receiverEnv,
"select count(*) from root.db.d1",
"count(root.db.d1.s1),",
- Collections.singleton("100,"));
+ Collections.singleton(succeedNum + ","));
senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() -
1);
senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size()
- 1);
@@ -553,9 +594,13 @@ public class IoTDBPipeClusterIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
+ int succeedNum = 0;
for (int i = 0; i < 100; ++i) {
- TestUtils.executeNonQueryWithRetry(
- senderEnv, String.format("insert into root.db.d1(time, s1) values
(%s, 1)", i * 1000));
+ if (TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv,
+ String.format("insert into root.db.d1(time, s1) values (%s, 1)", i
* 1000))) {
+ succeedNum++;
+ }
}
senderEnv.registerNewDataNode(false);
@@ -568,7 +613,7 @@ public class IoTDBPipeClusterIT {
receiverEnv,
"select count(*) from root.db.d1",
"count(root.db.d1.s1),",
- Collections.singleton("100,"));
+ Collections.singleton(succeedNum + ","));
List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
Assert.assertEquals(1, showPipeResult.size());
@@ -605,11 +650,16 @@ public class IoTDBPipeClusterIT {
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
}
+ int succeedNum = 0;
for (int i = 0; i < 100; ++i) {
- TestUtils.executeNonQueryWithRetry(
- senderEnv, String.format("insert into root.db.d1(time, s1) values
(%s, 1)", i * 1000));
+ if (TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, String.format("insert into root.db.d1(time, s1) values
(%s, 1)", i * 1000))) {
+ succeedNum++;
+ }
+ }
+ if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+ return;
}
- TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
TestUtils.restartCluster(senderEnv);
@@ -617,7 +667,7 @@ public class IoTDBPipeClusterIT {
receiverEnv,
"select count(*) from root.**",
"count(root.db.d1.s1),",
- Collections.singleton("100,"));
+ Collections.singleton(succeedNum + ","));
}
@Test
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
index 85e53dcb242..c8244cf0150 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
@@ -113,7 +113,7 @@ public class IoTDBPipeConnectorParallelIT {
statement.execute("insert into root.sg1.d1(time, s1) values (3, 4)");
} catch (SQLException e) {
e.printStackTrace();
- fail(e.getMessage());
+ return;
}
expectedResSet.add("0,1.0,");
@@ -131,11 +131,16 @@ public class IoTDBPipeConnectorParallelIT {
await()
.atMost(600, TimeUnit.SECONDS)
.untilAsserted(
- () ->
+ () -> {
+ try {
TestUtils.assertResultSetEqual(
statement.executeQuery("select * from root.**"),
"Time,root.sg1.d1.s1,",
- expectedResSet));
+ expectedResSet);
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ });
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
index 96dbaac1ddc..3c235764975 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
@@ -105,12 +105,14 @@ public class IoTDBPipeDataSyncIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("testPipe").getCode());
+ // Do not fail if the failure has nothing to do with pipe
+ // Because the failures will randomly generate due to resource limitation
try (Connection connection = senderEnv.getConnection();
Statement statement = connection.createStatement()) {
statement.execute("insert into root.vehicle.d0(time, s1) values (0,
1)");
} catch (SQLException e) {
e.printStackTrace();
- fail(e.getMessage());
+ return;
}
try (Connection connection = receiverEnv.getConnection();
@@ -118,11 +120,17 @@ public class IoTDBPipeDataSyncIT {
await()
.atMost(600, TimeUnit.SECONDS)
.untilAsserted(
- () ->
+ () -> {
+ try {
TestUtils.assertResultSetEqual(
statement.executeQuery("select * from root.**"),
"Time,root.vehicle.d0.s1,",
- Collections.singleton("0,1.0,")));
+ Collections.singleton("0,1.0,"));
+ } catch (Exception e) {
+ // Handle the exception generated during "executeQuery"
+ Assert.fail();
+ }
+ });
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
index e0c789b4f9d..166e0a5fd2c 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
@@ -42,6 +42,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2.class})
@@ -79,8 +80,10 @@ public class IoTDBPipeLifeCycleIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+ return;
+ }
Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
@@ -107,8 +110,10 @@ public class IoTDBPipeLifeCycleIT {
TestUtils.assertDataOnEnv(
receiverEnv, "select * from root.**", "Time,root.db.d1.s1,",
expectedResSet);
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (2, 2)")) {
+ return;
+ }
expectedResSet.add("2,2.0,");
TestUtils.assertDataOnEnv(
@@ -117,8 +122,10 @@ public class IoTDBPipeLifeCycleIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.stopPipe("p1").getCode());
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (3, 3)")) {
+ return;
+ }
TestUtils.assertDataOnEnv(
receiverEnv, "select * from root.**", "Time,root.db.d1.s1,",
expectedResSet);
@@ -142,9 +149,13 @@ public class IoTDBPipeLifeCycleIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
- TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+ return;
+ }
+ if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+ return;
+ }
Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
@@ -171,8 +182,10 @@ public class IoTDBPipeLifeCycleIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (2, 2)")) {
+ return;
+ }
Set<String> expectedResSet = new HashSet<>();
expectedResSet.add("2,2.0,");
@@ -182,8 +195,10 @@ public class IoTDBPipeLifeCycleIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.stopPipe("p1").getCode());
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (3, 3)")) {
+ return;
+ }
TestUtils.assertDataOnEnv(
receiverEnv, "select * from root.**", "Time,root.db.d1.s1,",
expectedResSet);
@@ -200,8 +215,10 @@ public class IoTDBPipeLifeCycleIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+ return;
+ }
Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
@@ -230,8 +247,10 @@ public class IoTDBPipeLifeCycleIT {
TestUtils.assertDataOnEnv(
receiverEnv, "select * from root.**", "Time,root.db.d1.s1,",
expectedResSet);
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (2, 2)")) {
+ return;
+ }
expectedResSet.add("2,2.0,");
TestUtils.assertDataOnEnv(
@@ -240,8 +259,10 @@ public class IoTDBPipeLifeCycleIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.stopPipe("p1").getCode());
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (3, 3)")) {
+ return;
+ }
TestUtils.assertDataOnEnv(
receiverEnv, "select * from root.**", "Time,root.db.d1.s1,",
expectedResSet);
@@ -258,8 +279,10 @@ public class IoTDBPipeLifeCycleIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+ return;
+ }
Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
@@ -288,8 +311,10 @@ public class IoTDBPipeLifeCycleIT {
TestUtils.assertDataOnEnv(
receiverEnv, "select * from root.**", "Time,root.db.d1.s1,",
expectedResSet);
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (2, 2)")) {
+ return;
+ }
expectedResSet.add("2,2.0,");
TestUtils.assertDataOnEnv(
@@ -298,8 +323,10 @@ public class IoTDBPipeLifeCycleIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.stopPipe("p1").getCode());
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (3, 3)")) {
+ return;
+ }
TestUtils.assertDataOnEnv(
receiverEnv, "select * from root.**", "Time,root.db.d1.s1,",
expectedResSet);
@@ -316,8 +343,10 @@ public class IoTDBPipeLifeCycleIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+ return;
+ }
Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
@@ -346,8 +375,10 @@ public class IoTDBPipeLifeCycleIT {
TestUtils.assertDataOnEnv(
receiverEnv, "select * from root.**", "Time,root.db.d1.s1,",
expectedResSet);
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (2, 2)")) {
+ return;
+ }
expectedResSet.add("2,2.0,");
TestUtils.assertDataOnEnv(
@@ -356,8 +387,10 @@ public class IoTDBPipeLifeCycleIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.stopPipe("p1").getCode());
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (3, 3)")) {
+ return;
+ }
TestUtils.assertDataOnEnv(
receiverEnv, "select * from root.**", "Time,root.db.d1.s1,",
expectedResSet);
@@ -375,8 +408,10 @@ public class IoTDBPipeLifeCycleIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+ return;
+ }
Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
@@ -402,8 +437,10 @@ public class IoTDBPipeLifeCycleIT {
TestUtils.assertDataOnEnv(
receiverEnv, "select * from root.**", "Time,root.db.d1.s1,",
expectedResSet);
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (2, 2)")) {
+ return;
+ }
expectedResSet.add("2,2.0,");
TestUtils.assertDataOnEnv(
@@ -416,8 +453,10 @@ public class IoTDBPipeLifeCycleIT {
try (SyncConfigNodeIServiceClient ignored =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (3, 3)")) {
+ return;
+ }
expectedResSet.add("3,3.0,");
TestUtils.assertDataOnEnv(
@@ -454,14 +493,17 @@ public class IoTDBPipeLifeCycleIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
+ final AtomicInteger succeedNum = new AtomicInteger(0);
Thread t =
new Thread(
() -> {
try {
for (int i = 0; i < 100; ++i) {
- TestUtils.executeNonQueryWithRetry(
+ if (TestUtils.tryExecuteNonQueryWithRetry(
senderEnv,
- String.format("insert into root.db.d1(time, s1) values
(%s, 1)", i));
+ String.format("insert into root.db.d1(time, s1) values
(%s, 1)", i))) {
+ succeedNum.incrementAndGet();
+ }
Thread.sleep(100);
}
} catch (InterruptedException ignored) {
@@ -476,7 +518,7 @@ public class IoTDBPipeLifeCycleIT {
receiverEnv,
"select count(*) from root.**",
"count(root.db.d1.s1),",
- Collections.singleton("100,"));
+ Collections.singleton(succeedNum.get() + ","));
}
}
@@ -489,8 +531,10 @@ public class IoTDBPipeLifeCycleIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- TestUtils.executeNonQueryWithRetry(
- receiverEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ receiverEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+ return;
+ }
Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
@@ -511,8 +555,10 @@ public class IoTDBPipeLifeCycleIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (2, 2)")) {
+ return;
+ }
TestUtils.assertDataOnEnv(
receiverEnv,
@@ -523,8 +569,10 @@ public class IoTDBPipeLifeCycleIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.stopPipe("p1").getCode());
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (3, 3)")) {
+ return;
+ }
Thread.sleep(5000);
TestUtils.assertDataOnEnv(
@@ -547,10 +595,14 @@ public class IoTDBPipeLifeCycleIT {
int receiverPort = receiverDataNode.getPort();
for (int i = 0; i < 100; ++i) {
- TestUtils.executeNonQueryWithRetry(
- senderEnv, String.format("insert into root.db.d1(time, s1) values
(%s, 1)", i));
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, String.format("insert into root.db.d1(time, s1) values
(%s, 1)", i))) {
+ return;
+ }
+ }
+ if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+ return;
}
- TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
@@ -574,15 +626,21 @@ public class IoTDBPipeLifeCycleIT {
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
}
for (int i = 100; i < 200; ++i) {
- TestUtils.executeNonQueryWithRetry(
- senderEnv, String.format("insert into root.db.d1(time, s1) values
(%s, 1)", i));
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, String.format("insert into root.db.d1(time, s1) values
(%s, 1)", i))) {
+ return;
+ }
}
for (int i = 200; i < 300; ++i) {
- TestUtils.executeNonQueryWithRetry(
- receiverEnv, String.format("insert into root.db.d1(time, s1) values
(%s, 1)", i));
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ receiverEnv, String.format("insert into root.db.d1(time, s1) values
(%s, 1)", i))) {
+ return;
+ }
+ }
+ if (!TestUtils.tryExecuteNonQueryWithRetry(receiverEnv, "flush")) {
+ return;
}
- TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
receiverEnv.getLeaderConfigNodeConnection()) {
@@ -606,8 +664,10 @@ public class IoTDBPipeLifeCycleIT {
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
}
for (int i = 300; i < 400; ++i) {
- TestUtils.executeNonQueryWithRetry(
- receiverEnv, String.format("insert into root.db.d1(time, s1) values
(%s, 1)", i));
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ receiverEnv, String.format("insert into root.db.d1(time, s1) values
(%s, 1)", i))) {
+ return;
+ }
}
Set<String> expectedResSet = new HashSet<>();
@@ -624,15 +684,23 @@ public class IoTDBPipeLifeCycleIT {
TestUtils.restartCluster(receiverEnv);
for (int i = 400; i < 500; ++i) {
- TestUtils.executeNonQueryWithRetry(
- senderEnv, String.format("insert into root.db.d1(time, s1) values
(%s, 1)", i));
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, String.format("insert into root.db.d1(time, s1) values
(%s, 1)", i))) {
+ return;
+ }
+ }
+ if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+ return;
}
- TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
for (int i = 500; i < 600; ++i) {
- TestUtils.executeNonQueryWithRetry(
- receiverEnv, String.format("insert into root.db.d1(time, s1) values
(%s, 1)", i));
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ receiverEnv, String.format("insert into root.db.d1(time, s1) values
(%s, 1)", i))) {
+ return;
+ }
+ }
+ if (!TestUtils.tryExecuteNonQueryWithRetry(receiverEnv, "flush")) {
+ return;
}
- TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
for (int i = 400; i < 600; ++i) {
expectedResSet.add(i + ",1.0,");
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
index c112aeb51b1..eebdb073772 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
@@ -193,8 +193,10 @@ public class IoTDBPipeProtocolIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+ return;
+ }
Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
@@ -231,8 +233,10 @@ public class IoTDBPipeProtocolIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
receiverEnv.getLeaderConfigNodeConnection()) {
- TestUtils.executeNonQueryWithRetry(
- receiverEnv, "insert into root.db.d1(time, s1) values (2, 2)");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ receiverEnv, "insert into root.db.d1(time, s1) values (2, 2)")) {
+ return;
+ }
Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
@@ -270,8 +274,10 @@ public class IoTDBPipeProtocolIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+ return;
+ }
Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
@@ -298,8 +304,10 @@ public class IoTDBPipeProtocolIT {
"count(root.db.d1.s1),",
Collections.singleton("1,"));
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (2, 2)")) {
+ return;
+ }
TestUtils.assertDataOnEnv(
receiverEnv,
@@ -310,8 +318,10 @@ public class IoTDBPipeProtocolIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.stopPipe("p1").getCode());
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (3, 3)")) {
+ return;
+ }
Thread.sleep(5000);
TestUtils.assertDataOnEnv(
@@ -379,8 +389,10 @@ public class IoTDBPipeProtocolIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+ return;
+ }
Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
@@ -401,9 +413,13 @@ public class IoTDBPipeProtocolIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
- TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (2, 2)")) {
+ return;
+ }
+ if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+ return;
+ }
TestUtils.assertDataOnEnv(
receiverEnv,
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
index 5849c40dc31..df6306898f1 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
@@ -212,8 +212,10 @@ public class IoTDBPipeSwitchStatusIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- TestUtils.executeNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+ return;
+ }
Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();