This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1a52ca3822d Pipe IT: Add tests for iotdb-legacy-pipe-sink & Add
forced-log & file mode tests for real-time mode & Refactor ITs (#11617)
1a52ca3822d is described below
commit 1a52ca3822d852f13042864454d262a243ff68b4
Author: Caideyipi <[email protected]>
AuthorDate: Mon Nov 27 17:45:54 2023 +0800
Pipe IT: Add tests for iotdb-legacy-pipe-sink & Add forced-log & file mode
tests for real-time mode & Refactor ITs (#11617)
---
.../org/apache/iotdb/db/it/utils/TestUtils.java | 49 +++
.../apache/iotdb/pipe/it/IoTDBPipeClusterIT.java | 189 ++++++++----
.../pipe/it/IoTDBPipeConnectorParallelIT.java | 49 +--
...ipeDataSyncIT.java => IoTDBPipeDataSinkIT.java} | 132 ++++----
.../it/{extractor => }/IoTDBPipeExtractorIT.java | 343 ++++++---------------
.../apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java | 33 +-
.../apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java | 34 +-
7 files changed, 397 insertions(+), 432 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
index 8c0965b951a..f7aac47b65f 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
@@ -411,6 +411,31 @@ public class TestUtils {
return false;
}
+ // This method will not throw failure given that a failure is encountered.
+ // Instead, it return a flag to indicate the result of the execution.
+ public static boolean tryExecuteNonQueriesWithRetry(BaseEnv env,
List<String> sqlList) {
+ for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
+ try (Connection connection = env.getConnection();
+ Statement statement = connection.createStatement()) {
+ for (String sql : sqlList) {
+ statement.execute(sql);
+ }
+ return true;
+ } catch (SQLException e) {
+ if (retryCountLeft > 0) {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException ignored) {
+ }
+ } else {
+ e.printStackTrace();
+ return false;
+ }
+ }
+ }
+ return false;
+ }
+
public static void executeNonQueryOnSpecifiedDataNodeWithRetry(
BaseEnv env, DataNodeWrapper wrapper, String sql) {
for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
@@ -456,6 +481,30 @@ public class TestUtils {
return false;
}
+ public static boolean tryExecuteNonQueriesOnSpecifiedDataNodeWithRetry(
+ BaseEnv env, DataNodeWrapper wrapper, List<String> sqlList) {
+ for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
+ try (Connection connection =
env.getConnectionWithSpecifiedDataNode(wrapper);
+ Statement statement = connection.createStatement()) {
+ for (String sql : sqlList) {
+ statement.execute(sql);
+ }
+ return true;
+ } catch (SQLException e) {
+ if (retryCountLeft > 0) {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException ignored) {
+ }
+ } else {
+ e.printStackTrace();
+ return false;
+ }
+ }
+ }
+ return false;
+ }
+
public static void executeQuery(String sql) {
executeQuery(sql, "root", "root");
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
index 632e74e19eb..8ead12976c2 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.pipe.it;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.cluster.RegionRoleType;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
@@ -46,7 +47,9 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -118,18 +121,14 @@ public class IoTDBPipeClusterIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values
(2010-01-01T10:00:00+08:00, 1)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values
(2010-01-02T10:00:00+08:00, 2)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1(time, s1) values
(2010-01-01T10:00:00+08:00, 1)",
+ "insert into root.db.d1(time, s1) values
(2010-01-02T10:00:00+08:00, 2)",
+ "flush"))) {
return;
}
-
Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
Map<String, String> connectorAttributes = new HashMap<>();
@@ -167,11 +166,9 @@ public class IoTDBPipeClusterIT {
"count(root.db.d1.s1),",
Collections.singleton("1,"));
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (now(), 3)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList("insert into root.db.d1(time, s1) values (now(), 3)",
"flush"))) {
return;
}
@@ -213,11 +210,8 @@ public class IoTDBPipeClusterIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv, Arrays.asList("insert into root.db.d1(time, s1) values
(1, 1)", "flush"))) {
return;
}
@@ -236,27 +230,33 @@ public class IoTDBPipeClusterIT {
for (int i = 0; i < 3; ++i) {
if (senderEnv.getDataNodeWrapper(i).getPort() == leaderPort.get()) {
leaderIndex = i;
- senderEnv.shutdownDataNode(i);
+ try {
+ senderEnv.shutdownDataNode(i);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ignored) {
}
- senderEnv.startDataNode(i);
- ((AbstractEnv) senderEnv).testWorkingNoUnknown();
+ try {
+ senderEnv.startDataNode(i);
+ ((AbstractEnv) senderEnv).testWorkingNoUnknown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
}
}
if (leaderIndex == -1) { // ensure the leader is stopped
fail();
}
- if (!TestUtils.tryExecuteNonQueryOnSpecifiedDataNodeWithRetry(
+ if (!TestUtils.tryExecuteNonQueriesOnSpecifiedDataNodeWithRetry(
senderEnv,
senderEnv.getDataNodeWrapper(leaderIndex),
- "insert into root.db.d1(time, s1) values (2, 2)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryOnSpecifiedDataNodeWithRetry(
- senderEnv, senderEnv.getDataNodeWrapper(leaderIndex), "flush")) {
+ Arrays.asList("insert into root.db.d1(time, s1) values (2, 2)",
"flush"))) {
return;
}
@@ -267,8 +267,13 @@ public class IoTDBPipeClusterIT {
Collections.singleton("2,"));
}
- TestUtils.restartCluster(senderEnv);
- TestUtils.restartCluster(receiverEnv);
+ try {
+ TestUtils.restartCluster(senderEnv);
+ TestUtils.restartCluster(receiverEnv);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
@@ -294,11 +299,8 @@ public class IoTDBPipeClusterIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p2").getCode());
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv, "insert into root.db.d2(time, s1) values (1, 1)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv, Arrays.asList("insert into root.db.d2(time, s1) values
(1, 1)", "flush"))) {
return;
}
@@ -340,23 +342,23 @@ public class IoTDBPipeClusterIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv, Arrays.asList("insert into root.db.d1(time, s1) values
(1, 1)", "flush"))) {
return;
}
- if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+
+ try {
+ senderEnv.registerNewDataNode(true);
+ } catch (Exception e) {
+ e.printStackTrace();
return;
}
-
- senderEnv.registerNewDataNode(true);
DataNodeWrapper newDataNode =
senderEnv.getDataNodeWrapper(senderEnv.getDataNodeWrapperList().size() - 1);
- if (!TestUtils.tryExecuteNonQueryOnSpecifiedDataNodeWithRetry(
- senderEnv, newDataNode, "insert into root.db.d1(time, s1) values (2,
2)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryOnSpecifiedDataNodeWithRetry(
- senderEnv, newDataNode, "flush")) {
+ if (!TestUtils.tryExecuteNonQueriesOnSpecifiedDataNodeWithRetry(
+ senderEnv,
+ newDataNode,
+ Arrays.asList("insert into root.db.d1(time, s1) values (2, 2)",
"flush"))) {
return;
}
TestUtils.assertDataOnEnv(
@@ -366,8 +368,13 @@ public class IoTDBPipeClusterIT {
Collections.singleton("2,"));
}
- TestUtils.restartCluster(senderEnv);
- TestUtils.restartCluster(receiverEnv);
+ try {
+ TestUtils.restartCluster(senderEnv);
+ TestUtils.restartCluster(receiverEnv);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
@@ -393,11 +400,8 @@ public class IoTDBPipeClusterIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p2").getCode());
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv, "insert into root.db.d2(time, s1) values (1, 1)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv, Arrays.asList("insert into root.db.d2(time, s1) values
(1, 1)", "flush"))) {
return;
}
@@ -438,8 +442,9 @@ public class IoTDBPipeClusterIT {
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));
} catch (TException e) {
+ // Not sure if the "createPipe" has succeeded
e.printStackTrace();
- fail(e.getMessage());
+ return;
}
try {
Thread.sleep(100);
@@ -448,7 +453,12 @@ public class IoTDBPipeClusterIT {
}
});
t.start();
- senderEnv.registerNewDataNode(true);
+ try {
+ senderEnv.registerNewDataNode(true);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
t.join();
}
@@ -504,7 +514,12 @@ public class IoTDBPipeClusterIT {
}
});
t.start();
- senderEnv.registerNewDataNode(true);
+ try {
+ senderEnv.registerNewDataNode(true);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
t.join();
TestUtils.assertDataOnEnv(
@@ -513,8 +528,12 @@ public class IoTDBPipeClusterIT {
"count(root.db.d1.s1),",
Collections.singleton(succeedNum.get() + ","));
- senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() -
1);
-
senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size()
- 1);
+ try {
+ senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() -
1);
+
senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size()
- 1);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
}
@@ -554,7 +573,12 @@ public class IoTDBPipeClusterIT {
}
}
- senderEnv.registerNewDataNode(true);
+ try {
+ senderEnv.registerNewDataNode(true);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
TestUtils.assertDataOnEnv(
receiverEnv,
@@ -562,8 +586,12 @@ public class IoTDBPipeClusterIT {
"count(root.db.d1.s1),",
Collections.singleton(succeedNum + ","));
- senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() -
1);
-
senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size()
- 1);
+ try {
+ senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() -
1);
+
senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size()
- 1);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
}
@@ -609,11 +637,16 @@ public class IoTDBPipeClusterIT {
}
}
- senderEnv.registerNewDataNode(false);
- senderEnv.startDataNode(senderEnv.getDataNodeWrapperList().size() - 1);
- senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() -
1);
-
senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size()
- 1);
- ((AbstractEnv) senderEnv).testWorkingNoUnknown();
+ try {
+ senderEnv.registerNewDataNode(false);
+ senderEnv.startDataNode(senderEnv.getDataNodeWrapperList().size() - 1);
+ senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() -
1);
+
senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size()
- 1);
+ ((AbstractEnv) senderEnv).testWorkingNoUnknown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
TestUtils.assertDataOnEnv(
receiverEnv,
@@ -667,7 +700,12 @@ public class IoTDBPipeClusterIT {
return;
}
- TestUtils.restartCluster(senderEnv);
+ try {
+ TestUtils.restartCluster(senderEnv);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
TestUtils.assertDataOnEnv(
receiverEnv,
@@ -708,7 +746,12 @@ public class IoTDBPipeClusterIT {
if (status.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
successCount.updateAndGet(v -> v + 1);
}
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (TException | ClientManagerException | IOException e) {
+ e.printStackTrace();
} catch (Exception e) {
+ // Fail iff pipe exception occurs
e.printStackTrace();
fail(e.getMessage());
}
@@ -733,7 +776,12 @@ public class IoTDBPipeClusterIT {
if (status.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
successCount.updateAndGet(v -> v + 1);
}
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (TException | ClientManagerException | IOException e) {
+ e.printStackTrace();
} catch (Exception e) {
+ // Fail iff pipe exception occurs
e.printStackTrace();
fail(e.getMessage());
}
@@ -798,7 +846,12 @@ public class IoTDBPipeClusterIT {
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (TException | ClientManagerException | IOException e) {
+ e.printStackTrace();
} catch (Exception e) {
+ // Fail iff pipe exception occurs
e.printStackTrace();
fail(e.getMessage());
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
index c8244cf0150..271957d61f9 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
@@ -37,17 +37,11 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import static org.awaitility.Awaitility.await;
-import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2.class})
@@ -105,14 +99,13 @@ public class IoTDBPipeConnectorParallelIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("testPipe").getCode());
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("insert into root.sg1.d1(time, s1) values (0, 1)");
- statement.execute("insert into root.sg1.d1(time, s1) values (1, 2)");
- statement.execute("insert into root.sg1.d1(time, s1) values (2, 3)");
- statement.execute("insert into root.sg1.d1(time, s1) values (3, 4)");
- } catch (SQLException e) {
- e.printStackTrace();
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.sg1.d1(time, s1) values (0, 1)",
+ "insert into root.sg1.d1(time, s1) values (1, 2)",
+ "insert into root.sg1.d1(time, s1) values (2, 3)",
+ "insert into root.sg1.d1(time, s1) values (3, 4)"))) {
return;
}
@@ -120,30 +113,8 @@ public class IoTDBPipeConnectorParallelIT {
expectedResSet.add("1,2.0,");
expectedResSet.add("2,3.0,");
expectedResSet.add("3,4.0,");
- assertDataOnReceiver(receiverEnv, expectedResSet);
- assertDataOnReceiver(receiverEnv, expectedResSet);
- }
- }
-
- private void assertDataOnReceiver(BaseEnv receiverEnv, Set<String>
expectedResSet) {
- try (Connection connection = receiverEnv.getConnection();
- Statement statement = connection.createStatement()) {
- await()
- .atMost(600, TimeUnit.SECONDS)
- .untilAsserted(
- () -> {
- try {
- TestUtils.assertResultSetEqual(
- statement.executeQuery("select * from root.**"),
- "Time,root.sg1.d1.s1,",
- expectedResSet);
- } catch (Exception e) {
- Assert.fail();
- }
- });
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
+ TestUtils.assertDataOnEnv(
+ receiverEnv, "select * from root.**", "Time,root.sg1.d1.s1,",
expectedResSet);
}
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java
similarity index 63%
rename from
integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
rename to
integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java
index 3c235764975..52499cd14d4 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java
@@ -37,20 +37,14 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static org.awaitility.Awaitility.await;
-import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2.class})
-public class IoTDBPipeDataSyncIT {
+public class IoTDBPipeDataSinkIT {
private BaseEnv senderEnv;
private BaseEnv receiverEnv;
@@ -75,7 +69,7 @@ public class IoTDBPipeDataSyncIT {
}
@Test
- public void testEnv() throws Exception {
+ public void testThriftConnector() throws Exception {
DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
String receiverIp = receiverDataNode.getIp();
@@ -107,34 +101,65 @@ public class IoTDBPipeDataSyncIT {
// Do not fail if the failure has nothing to do with pipe
// Because the failures will randomly generate due to resource limitation
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("insert into root.vehicle.d0(time, s1) values (0,
1)");
- } catch (SQLException e) {
- e.printStackTrace();
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.vehicle.d0(time, s1) values (0, 1)")) {
return;
}
- try (Connection connection = receiverEnv.getConnection();
- Statement statement = connection.createStatement()) {
- await()
- .atMost(600, TimeUnit.SECONDS)
- .untilAsserted(
- () -> {
- try {
- TestUtils.assertResultSetEqual(
- statement.executeQuery("select * from root.**"),
- "Time,root.vehicle.d0.s1,",
- Collections.singleton("0,1.0,"));
- } catch (Exception e) {
- // Handle the exception generated during "executeQuery"
- Assert.fail();
- }
- });
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select * from root.**",
+ "Time,root.vehicle.d0.s1,",
+ Collections.singleton("0,1.0,"));
+ }
+ }
+
+ @Test
+ public void testLegacyConnector() throws Exception {
+ DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ String receiverIp = receiverDataNode.getIp();
+ int receiverPort = receiverDataNode.getPort();
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("source.realtime.mode", "log");
+
+ connectorAttributes.put("sink", "iotdb-legacy-pipe-sink");
+ connectorAttributes.put("sink.batch.enable", "false");
+ connectorAttributes.put("sink.ip", receiverIp);
+ connectorAttributes.put("sink.port", Integer.toString(receiverPort));
+
+ // This version does not matter since it's no longer checked by the
legacy receiver
+ connectorAttributes.put("sink.version", "1.3");
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("testPipe", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("testPipe").getCode());
+
+ // Do not fail if the failure has nothing to do with pipe
+ // Because the failures will randomly generate due to resource limitation
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv, "insert into root.vehicle.d0(time, s1) values (0, 1)")) {
+ return;
}
+
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select * from root.**",
+ "Time,root.vehicle.d0.s1,",
+ Collections.singleton("0,1.0,"));
}
}
@@ -166,37 +191,20 @@ public class IoTDBPipeDataSyncIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("testPipe").getCode());
- try (Connection connection = receiverEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("create aligned timeseries root.sg.d1(s0 float, s1
float)");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("create aligned timeseries root.sg.d1(s0 float, s1
float)");
- statement.execute("insert into root.sg.d1(time, s0, s1) values (3,
null, 25.34)");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ receiverEnv,
+ Arrays.asList(
+ "create aligned timeseries root.sg.d1(s0 float, s1 float)",
+ "create aligned timeseries root.sg.d1(s0 float, s1 float)",
+ "insert into root.sg.d1(time, s0, s1) values (3, null,
25.34)"))) {
+ return;
}
- try (Connection connection = receiverEnv.getConnection();
- Statement statement = connection.createStatement()) {
- await()
- .atMost(600, TimeUnit.SECONDS)
- .untilAsserted(
- () ->
- TestUtils.assertResultSetEqual(
- statement.executeQuery("select * from root.**"),
- "Time,root.sg.d1.s0,root.sg.d1.s1,",
- Collections.singleton("3,null,25.34,")));
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select * from root.**",
+ "Time,root.sg.d1.s0,root.sg.d1.s1,",
+ Collections.singleton("3,null,25.34,"));
}
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/extractor/IoTDBPipeExtractorIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
similarity index 65%
rename from
integration-test/src/test/java/org/apache/iotdb/pipe/it/extractor/IoTDBPipeExtractorIT.java
rename to
integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
index e7aa84fd681..a8f0828bfa9 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/extractor/IoTDBPipeExtractorIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.pipe.it.extractor;
+package org.apache.iotdb.pipe.it;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
@@ -48,10 +48,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import static org.awaitility.Awaitility.await;
import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@@ -211,53 +208,23 @@ public class IoTDBPipeExtractorIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv, "insert into root.nonAligned.1TS (time, s_float) values
(now(), 0.5)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv, "insert into root.nonAligned.100TS (time, s_float) values
(now(), 0.5)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv, "insert into root.nonAligned.1000TS (time, s_float)
values (now(), 0.5)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv, "insert into root.nonAligned.`1(TS)` (time, s_float)
values (now(), 0.5)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
- "insert into root.nonAligned.6TS.`6` ("
- + "time, `s_float(1)`, `s_int(1)`, `s_double(1)`, `s_long(1)`,
`s_text(1)`, `s_bool(1)`) "
- + "values (now(), 0.5, 1, 1.5, 2, \"text1\", true)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv, "insert into root.aligned.1TS (time, s_float) aligned
values (now(), 0.5)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv,
- "insert into root.aligned.100TS (time, s_float) aligned values
(now(), 0.5)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv,
- "insert into root.aligned.1000TS (time, s_float) aligned values
(now(), 0.5)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv,
- "insert into root.aligned.`1(TS)` (time, s_float) aligned values
(now(), 0.5)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv,
- "insert into root.aligned.6TS.`6` ("
- + "time, `s_float(1)`, `s_int(1)`, `s_double(1)`, `s_long(1)`,
`s_text(1)`, `s_bool(1)`) "
- + "aligned values (now(), 0.5, 1, 1.5, 2, \"text1\", true)")) {
+ Arrays.asList(
+ "insert into root.nonAligned.1TS (time, s_float) values (now(),
0.5)",
+ "insert into root.nonAligned.100TS (time, s_float) values
(now(), 0.5)",
+ "insert into root.nonAligned.1000TS (time, s_float) values
(now(), 0.5)",
+ "insert into root.nonAligned.`1(TS)` (time, s_float) values
(now(), 0.5)",
+ "insert into root.nonAligned.6TS.`6` ("
+ + "time, `s_float(1)`, `s_int(1)`, `s_double(1)`,
`s_long(1)`, `s_text(1)`, `s_bool(1)`) "
+ + "values (now(), 0.5, 1, 1.5, 2, \"text1\", true)",
+ "insert into root.aligned.1TS (time, s_float) aligned values
(now(), 0.5)",
+ "insert into root.aligned.100TS (time, s_float) aligned values
(now(), 0.5)",
+ "insert into root.aligned.1000TS (time, s_float) aligned values
(now(), 0.5)",
+ "insert into root.aligned.`1(TS)` (time, s_float) aligned values
(now(), 0.5)",
+ "insert into root.aligned.6TS.`6` ("
+ + "time, `s_float(1)`, `s_int(1)`, `s_double(1)`,
`s_long(1)`, `s_text(1)`, `s_bool(1)`) "
+ + "aligned values (now(), 0.5, 1, 1.5, 2, \"text1\",
true)"))) {
return;
}
@@ -304,36 +271,22 @@ public class IoTDBPipeExtractorIT {
assertTimeseriesCountOnReceiver(receiverEnv,
expectedTimeseriesCount.get(i));
}
- try (Connection connection = receiverEnv.getConnection();
- Statement statement = connection.createStatement()) {
- Set<String> expectedDevices = new HashSet<>();
- expectedDevices.add("root.nonAligned.1TS,false,");
- expectedDevices.add("root.nonAligned.100TS,false,");
- expectedDevices.add("root.nonAligned.1000TS,false,");
- expectedDevices.add("root.nonAligned.`1(TS)`,false,");
- expectedDevices.add("root.nonAligned.6TS.`6`,false,");
- expectedDevices.add("root.aligned.1TS,true,");
- expectedDevices.add("root.aligned.100TS,true,");
- expectedDevices.add("root.aligned.1000TS,true,");
- expectedDevices.add("root.aligned.`1(TS)`,true,");
- expectedDevices.add("root.aligned.6TS.`6`,true,");
- await()
- .atMost(600, TimeUnit.SECONDS)
- .untilAsserted(
- () -> {
- try {
- TestUtils.assertResultSetEqual(
- statement.executeQuery("show devices"),
- "Device,IsAligned,",
- expectedDevices);
- } catch (Exception e) {
- Assert.fail();
- }
- });
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "show devices",
+ "Device,IsAligned,",
+ new HashSet<>(
+ Arrays.asList(
+ "root.nonAligned.1TS,false,",
+ "root.nonAligned.100TS,false,",
+ "root.nonAligned.1000TS,false,",
+ "root.nonAligned.`1(TS)`,false,",
+ "root.nonAligned.6TS.`6`,false,",
+ "root.aligned.1TS,true,",
+ "root.aligned.100TS,true,",
+ "root.aligned.1000TS,true,",
+ "root.aligned.`1(TS)`,true,",
+ "root.aligned.6TS.`6`,true,")));
}
}
@@ -367,15 +320,12 @@ public class IoTDBPipeExtractorIT {
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
assertTimeseriesCountOnReceiver(receiverEnv, 0);
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv, "insert into root.db1.d1 (time, at1) values (1, 10)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv, "insert into root.db2.d1 (time, at1) values (1, 20)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db1.d1 (time, at1) values (1, 10)",
+ "insert into root.db2.d1 (time, at1) values (1, 20)",
+ "flush"))) {
return;
}
@@ -395,15 +345,12 @@ public class IoTDBPipeExtractorIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.dropPipe("p2").getCode());
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv, "insert into root.db1.d1 (time, at1) values (2, 11)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv, "insert into root.db2.d1 (time, at1) values (2, 21)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db1.d1 (time, at1) values (2, 11)",
+ "insert into root.db2.d1 (time, at1) values (2, 21)",
+ "flush"))) {
return;
}
@@ -417,25 +364,11 @@ public class IoTDBPipeExtractorIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p3").getCode());
- try (Connection connection = receiverEnv.getConnection();
- Statement statement = connection.createStatement()) {
- await()
- .atMost(600, TimeUnit.SECONDS)
- .untilAsserted(
- () -> {
- try {
- TestUtils.assertResultSetEqual(
- statement.executeQuery("select count(*) from root.**"),
- "count(root.db1.d1.at1),count(root.db2.d1.at1),",
- Collections.singleton("2,2,"));
- } catch (Exception e) {
- Assert.fail();
- }
- });
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select count(*) from root.**",
+ "count(root.db1.d1.at1),count(root.db2.d1.at1),",
+ Collections.singleton("2,2,"));
}
}
@@ -448,23 +381,14 @@ public class IoTDBPipeExtractorIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv, "insert into root.db.d1 (time, at1) values (1, 10)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv, "insert into root.db.d2 (time, at1) values (1, 20)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv, "insert into root.db.d3 (time, at1) values (1, 30)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv, "insert into root.db.d4 (time, at1) values (1, 40)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1 (time, at1) values (1, 10)",
+ "insert into root.db.d2 (time, at1) values (1, 20)",
+ "insert into root.db.d3 (time, at1) values (1, 30)",
+ "insert into root.db.d4 (time, at1) values (1, 40)",
+ "flush"))) {
return;
}
@@ -524,55 +448,26 @@ public class IoTDBPipeExtractorIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p4").getCode());
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv, "insert into root.db.d1 (time, at1) values (2, 11)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv, "insert into root.db.d2 (time, at1) values (2, 21)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv, "insert into root.db.d3 (time, at1) values (2, 31)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv, "insert into root.db.d4 (time, at1) values (2, 41)")) {
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1 (time, at1) values (2, 11)",
+ "insert into root.db.d2 (time, at1) values (2, 21)",
+ "insert into root.db.d3 (time, at1) values (2, 31)",
+ "insert into root.db.d4 (time, at1) values (2, 41)"))) {
return;
}
- try (Connection connection = receiverEnv.getConnection();
- Statement statement = connection.createStatement()) {
- await()
- .atMost(600, TimeUnit.SECONDS)
- .untilAsserted(
- () -> {
- try {
- TestUtils.assertResultSetEqual(
- statement.executeQuery("select count(*) from root.**
where time <= 1"),
-
"count(root.db.d4.at1),count(root.db.d2.at1),count(root.db.d3.at1),",
- Collections.singleton("1,0,1,"));
- } catch (Exception e) {
- Assert.fail();
- }
- });
- await()
- .atMost(600, TimeUnit.SECONDS)
- .untilAsserted(
- () -> {
- try {
- TestUtils.assertResultSetEqual(
- statement.executeQuery("select count(*) from root.**
where time >= 2"),
-
"count(root.db.d4.at1),count(root.db.d2.at1),count(root.db.d3.at1),",
- Collections.singleton("1,1,0,"));
- } catch (Exception e) {
- Assert.fail();
- }
- });
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select count(*) from root.** where time <= 1",
+ "count(root.db.d4.at1),count(root.db.d2.at1),count(root.db.d3.at1),",
+ Collections.singleton("1,0,1,"));
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select count(*) from root.** where time >= 2",
+ "count(root.db.d4.at1),count(root.db.d2.at1),count(root.db.d3.at1),",
+ Collections.singleton("1,1,0,"));
}
}
@@ -585,19 +480,14 @@ public class IoTDBPipeExtractorIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv,
- "insert into root.db.d1 (time, at1)"
- + " values (1000, 1), (2000, 2), (3000, 3), (4000, 4), (5000,
5)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
- "insert into root.db.d2 (time, at1)"
- + " values (1000, 1), (2000, 2), (3000, 3), (4000, 4), (5000,
5)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+ Arrays.asList(
+ "insert into root.db.d1 (time, at1)"
+ + " values (1000, 1), (2000, 2), (3000, 3), (4000, 4),
(5000, 5)",
+ "insert into root.db.d2 (time, at1)"
+ + " values (1000, 1), (2000, 2), (3000, 3), (4000, 4),
(5000, 5)",
+ "flush"))) {
return;
}
@@ -624,25 +514,11 @@ public class IoTDBPipeExtractorIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
- try (Connection connection = receiverEnv.getConnection();
- Statement statement = connection.createStatement()) {
- await()
- .atMost(600, TimeUnit.SECONDS)
- .untilAsserted(
- () -> {
- try {
- TestUtils.assertResultSetEqual(
- statement.executeQuery("select count(*) from root.**"),
- "count(root.db.d1.at1),",
- Collections.singleton("3,"));
- } catch (Exception e) {
- Assert.fail();
- }
- });
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select count(*) from root.**",
+ "count(root.db.d1.at1),",
+ Collections.singleton("3,"));
extractorAttributes.remove("extractor.pattern");
status =
@@ -654,47 +530,16 @@ public class IoTDBPipeExtractorIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p2").getCode());
- try (Connection connection = receiverEnv.getConnection();
- Statement statement = connection.createStatement()) {
- await()
- .atMost(600, TimeUnit.SECONDS)
- .untilAsserted(
- () -> {
- try {
- TestUtils.assertResultSetEqual(
- statement.executeQuery("select count(*) from root.**"),
- "count(root.db.d1.at1),count(root.db.d2.at1),",
- Collections.singleton("3,3,"));
- } catch (Exception e) {
- Assert.fail();
- }
- });
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select count(*) from root.**",
+ "count(root.db.d1.at1),count(root.db.d2.at1),",
+ Collections.singleton("3,3,"));
}
}
private void assertTimeseriesCountOnReceiver(BaseEnv receiverEnv, int count)
{
- try (Connection connection = receiverEnv.getConnection();
- Statement statement = connection.createStatement()) {
- await()
- .atMost(600, TimeUnit.SECONDS)
- .untilAsserted(
- () -> {
- try {
- TestUtils.assertResultSetEqual(
- statement.executeQuery("count timeseries"),
- "count(timeseries),",
- Collections.singleton(count + ","));
- } catch (Exception e) {
- Assert.fail();
- }
- });
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.assertDataOnEnv(
+ receiverEnv, "count timeseries", "count(timeseries),",
Collections.singleton(count + ","));
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
index 166e0a5fd2c..2cd8acc60c2 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
@@ -37,6 +37,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -149,11 +150,8 @@ public class IoTDBPipeLifeCycleIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
- return;
- }
- if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv, Arrays.asList("insert into root.db.d1(time, s1) values
(1, 1)", "flush"))) {
return;
}
@@ -447,8 +445,13 @@ public class IoTDBPipeLifeCycleIT {
receiverEnv, "select * from root.**", "Time,root.db.d1.s1,",
expectedResSet);
}
- TestUtils.restartCluster(senderEnv);
- TestUtils.restartCluster(receiverEnv);
+ try {
+ TestUtils.restartCluster(senderEnv);
+ TestUtils.restartCluster(receiverEnv);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
try (SyncConfigNodeIServiceClient ignored =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
@@ -511,7 +514,12 @@ public class IoTDBPipeLifeCycleIT {
});
t.start();
- TestUtils.restartCluster(receiverEnv);
+ try {
+ TestUtils.restartCluster(receiverEnv);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
t.join();
TestUtils.assertDataOnEnv(
@@ -680,8 +688,13 @@ public class IoTDBPipeLifeCycleIT {
TestUtils.assertDataOnEnv(
receiverEnv, "select * from root.**", "Time,root.db.d1.s1,",
expectedResSet);
- TestUtils.restartCluster(senderEnv);
- TestUtils.restartCluster(receiverEnv);
+ try {
+ TestUtils.restartCluster(senderEnv);
+ TestUtils.restartCluster(receiverEnv);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
for (int i = 400; i < 500; ++i) {
if (!TestUtils.tryExecuteNonQueryWithRetry(
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
index 1290e46b94a..6bcaee39b56 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
@@ -39,6 +39,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -402,6 +403,9 @@ public class IoTDBPipeProtocolIT {
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.node-urls",
nodeUrlsBuilder.toString());
+ // Test forced-log mode, in TimechoDB this might be "file"
+ extractorAttributes.put("source.realtime.mode", "forced-log");
+
TSStatus status =
client.createPipe(
new TCreatePipeReq("p1", connectorAttributes)
@@ -413,11 +417,33 @@ public class IoTDBPipeProtocolIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
- if (!TestUtils.tryExecuteNonQueryWithRetry(
- senderEnv, "insert into root.db.d1(time, s1) values (2, 2)")) {
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv, Arrays.asList("insert into root.db.d1(time, s1) values
(2, 2)", "flush"))) {
return;
}
- if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select count(*) from root.**",
+ "count(root.db.d1.s1),",
+ Collections.singleton("2,"));
+
+ // Test file mode
+ extractorAttributes.replace("source.realtime.mode", "file");
+
+ status =
+ client.createPipe(
+ new TCreatePipeReq("p2", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ System.out.println(status.getMessage());
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p2").getCode());
+
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv, Arrays.asList("insert into root.db.d1(time, s1) values
(3, 3)", "flush"))) {
return;
}
@@ -425,7 +451,7 @@ public class IoTDBPipeProtocolIT {
receiverEnv,
"select count(*) from root.**",
"count(root.db.d1.s1),",
- Collections.singleton("2,"));
+ Collections.singleton("3,"));
}
}
}