This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch load--1-time-partition
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/load--1-time-partition by this
push:
new 96c84365e56 Update IoTDBPipeClusterIT.java
96c84365e56 is described below
commit 96c84365e569d93ad03a67cb28efd1831f8660e9
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Apr 9 16:43:40 2024 +0800
Update IoTDBPipeClusterIT.java
---
.../pipe/it/autocreate/IoTDBPipeClusterIT.java | 63 ++++++++++++++++++++++
1 file changed, 63 insertions(+)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
index e0194f8c591..9113e3669ba 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
@@ -862,4 +862,67 @@ public class IoTDBPipeClusterIT extends
AbstractPipeDualAutoIT {
Assert.assertEquals(pipeCount, showPipeResult.size());
}
}
+
+ @Test
+ public void testNegativeTimestamp() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1(time, s1) values (0, 1)",
+ "insert into root.db.d1(time, s1) values (-1, 2)",
+ "insert into root.db.d1(time, s1) values
(1960-01-02T10:00:00+08:00, 2)",
+ "flush"))) {
+ return;
+ }
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor", "iotdb-extractor");
+
+ processorAttributes.put("processor", "do-nothing-processor");
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+
+ final TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select count(*) from root.**",
+ "count(root.db.d1.s1),",
+ Collections.singleton("3,"));
+
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1(time, s1) values (-123, 3)",
+ "insert into root.db.d1(time, s1) values (now(), 3)",
+ "flush"))) {
+ return;
+ }
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select count(*) from root.**",
+ "count(root.db.d1.s1),",
+ Collections.singleton("5,"));
+ }
+ }
}