This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ad17bf4a97 Pipe: Allow pipe times configured by raw timestamp (#12004)
0ad17bf4a97 is described below

commit 0ad17bf4a977b80719c1a30a62c1690ccd6b1336
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jan 30 20:30:05 2024 +0800

    Pipe: Allow pipe times configured by raw timestamp (#12004)
---
 .../apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java | 37 +++++++++++++++++++---
 .../org/apache/iotdb/session/pool/SessionPool.java |  6 ++--
 .../PipeHistoricalDataRegionTsFileExtractor.java   | 19 +++++------
 .../realtime/PipeRealtimeDataRegionExtractor.java  |  9 +++---
 .../org/apache/iotdb/db/utils/DateTimeUtils.java   |  8 +++++
 5 files changed, 55 insertions(+), 24 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
index 02db92b8cbe..b681a2a6bd1 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
@@ -228,7 +228,7 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualIT {
       fail(e.getMessage());
     }
 
-    // Scenario 2.4: test when only 'source.end-time' is set
+    // Scenario 2.4: test when only when 'source.start-time' is less than 
'source.end-time'
     String p2_4 =
         String.format(
             "create pipe p2_4"
@@ -249,6 +249,32 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualIT {
     }
 
     assertPipeCount(9);
+
+    // ---------------------------------------------------------------------- 
//
+    // Scenario 3: when 'source.start-time' or 'source.end-time' is timestamp 
//
+    // ---------------------------------------------------------------------- 
//
+
+    // Scenario 3.1: test when 'source.start-time' is timestamp string
+    String p3_1 =
+        String.format(
+            "create pipe p3_1"
+                + " with source ("
+                + "'source.start-time'='1000',"
+                + "'source.end-time'='2000.01.01T08:00:00')"
+                + " with connector ("
+                + "'connector'='iotdb-thrift-connector',"
+                + "'connector.ip'='%s',"
+                + "'connector.port'='%s',"
+                + "'connector.batch.enable'='false')",
+            receiverIp, receiverPort);
+    try (Connection connection = senderEnv.getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute(p3_1);
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+
+    assertPipeCount(10);
   }
 
   @Test
@@ -273,8 +299,7 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualIT {
             "%s", receiverIp, receiverPort);
 
     List<String> invalidStartTimes =
-        Arrays.asList(
-            "''", "null", "'null'", "'1'", "'-1000-01-01T00:00:00'", 
"'2000-01-01T00:00:0'");
+        Arrays.asList("''", "null", "'null'", "'-1000-01-01T00:00:00'", 
"'2000-01-01T00:00:0'");
     for (String invalidStartTime : invalidStartTimes) {
       try (Connection connection = senderEnv.getConnection();
           Statement statement = connection.createStatement()) {
@@ -639,7 +664,8 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualIT {
 
       extractorAttributes.put("extractor.pattern", "root.db.d1");
       extractorAttributes.put("extractor.history.enable", "true");
-      extractorAttributes.put("extractor.history.start-time", 
"1970-01-01T08:00:02+08:00");
+      // 1970-01-01T08:00:02+08:00
+      extractorAttributes.put("extractor.history.start-time", "2000");
       extractorAttributes.put("extractor.history.end-time", 
"1970-01-01T08:00:04+08:00");
 
       connectorAttributes.put("connector", "iotdb-thrift-connector");
@@ -806,7 +832,8 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualIT {
 
       extractorAttributes.put("source.pattern", "root.db.d1");
       extractorAttributes.put("source.start-time", 
"1970-01-01T08:00:02+08:00");
-      extractorAttributes.put("source.end-time", "1970-01-01T08:00:04+08:00");
+      // 1970-01-01T08:00:04+08:00
+      extractorAttributes.put("source.end-time", "4000");
 
       connectorAttributes.put("connector", "iotdb-thrift-connector");
       connectorAttributes.put("connector.batch.enable", "false");
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index aab6b118391..4d48e7106c3 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -1032,12 +1032,12 @@ public class SessionPool implements ISessionPool {
 
   /**
    * Insert aligned data in batch format, which can reduce the overhead of 
network. This method is
-   * just like jdbc batch insert, we pack some insert request in batch and 
send them to server If
-   * you want improve your performance, please see insertTablet method.
+   * just like jdbc batch insert, we pack some insert request in batch and 
send them to server. If
+   * you want to improve your performance, please see insertTablet method.
    *
    * @see Session#insertTablet(Tablet)
    */
-  @SuppressWarnings({"squid:S112"}) // ignore Generic exceptions should never 
be throw
+  @SuppressWarnings({"squid:S112"}) // ignore Generic exceptions should never 
be thrown
   @Override
   public void insertAlignedRecords(
       List<String> multiSeriesIds,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 26ac4da4d3f..88c7fbcb6fd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -41,7 +41,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.time.ZoneId;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -105,13 +104,13 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
       try {
         historicalDataExtractionStartTime =
             parameters.hasAnyAttributes(SOURCE_START_TIME_KEY)
-                ? DateTimeUtils.convertDatetimeStrToLong(
-                    parameters.getStringByKeys(SOURCE_START_TIME_KEY), 
ZoneId.systemDefault())
+                ? 
DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(
+                    parameters.getStringByKeys(SOURCE_START_TIME_KEY))
                 : Long.MIN_VALUE;
         historicalDataExtractionEndTime =
             parameters.hasAnyAttributes(SOURCE_END_TIME_KEY)
-                ? DateTimeUtils.convertDatetimeStrToLong(
-                    parameters.getStringByKeys(SOURCE_END_TIME_KEY), 
ZoneId.systemDefault())
+                ? 
DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(
+                    parameters.getStringByKeys(SOURCE_END_TIME_KEY))
                 : Long.MAX_VALUE;
         if (historicalDataExtractionStartTime > 
historicalDataExtractionEndTime) {
           throw new PipeParameterNotValidException(
@@ -138,19 +137,17 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
           isHistoricalExtractorEnabled
                   && parameters.hasAnyAttributes(
                       EXTRACTOR_HISTORY_START_TIME_KEY, 
SOURCE_HISTORY_START_TIME_KEY)
-              ? DateTimeUtils.convertDatetimeStrToLong(
+              ? 
DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(
                   parameters.getStringByKeys(
-                      EXTRACTOR_HISTORY_START_TIME_KEY, 
SOURCE_HISTORY_START_TIME_KEY),
-                  ZoneId.systemDefault())
+                      EXTRACTOR_HISTORY_START_TIME_KEY, 
SOURCE_HISTORY_START_TIME_KEY))
               : Long.MIN_VALUE;
       historicalDataExtractionEndTime =
           isHistoricalExtractorEnabled
                   && parameters.hasAnyAttributes(
                       EXTRACTOR_HISTORY_END_TIME_KEY, 
SOURCE_HISTORY_END_TIME_KEY)
-              ? DateTimeUtils.convertDatetimeStrToLong(
+              ? 
DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(
                   parameters.getStringByKeys(
-                      EXTRACTOR_HISTORY_END_TIME_KEY, 
SOURCE_HISTORY_END_TIME_KEY),
-                  ZoneId.systemDefault())
+                      EXTRACTOR_HISTORY_END_TIME_KEY, 
SOURCE_HISTORY_END_TIME_KEY))
               : Long.MAX_VALUE;
       if (historicalDataExtractionStartTime > historicalDataExtractionEndTime) 
{
         throw new PipeParameterNotValidException(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
index f12c24ee718..49fbaef5e50 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
@@ -45,7 +45,6 @@ import org.apache.iotdb.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -106,13 +105,13 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
     try {
       realtimeDataExtractionStartTime =
           parameters.hasAnyAttributes(SOURCE_START_TIME_KEY)
-              ? DateTimeUtils.convertDatetimeStrToLong(
-                  parameters.getStringByKeys(SOURCE_START_TIME_KEY), 
ZoneId.systemDefault())
+              ? 
DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(
+                  parameters.getStringByKeys(SOURCE_START_TIME_KEY))
               : Long.MIN_VALUE;
       realtimeDataExtractionEndTime =
           parameters.hasAnyAttributes(SOURCE_END_TIME_KEY)
-              ? DateTimeUtils.convertDatetimeStrToLong(
-                  parameters.getStringByKeys(SOURCE_END_TIME_KEY), 
ZoneId.systemDefault())
+              ? 
DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(
+                  parameters.getStringByKeys(SOURCE_END_TIME_KEY))
               : Long.MAX_VALUE;
       if (realtimeDataExtractionStartTime > realtimeDataExtractionEndTime) {
         throw new PipeParameterNotValidException(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java
index 6c48e26445e..6953c81d630 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java
@@ -451,6 +451,14 @@ public class DateTimeUtils {
           .appendOptional(ISO_OFFSET_DATE_TIME_WITH_DOT_WITH_SPACE_NS)
           .toFormatter();
 
+  public static long convertTimestampOrDatetimeStrToLongWithDefaultZone(String 
timeStr) {
+    try {
+      return Long.parseLong(timeStr);
+    } catch (NumberFormatException e) {
+      return DateTimeUtils.convertDatetimeStrToLong(timeStr, 
ZoneId.systemDefault());
+    }
+  }
+
   public static long convertDatetimeStrToLong(String str, ZoneId zoneId) {
     return convertDatetimeStrToLong(
         str,

Reply via email to