This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0ad17bf4a97 Pipe: Allow pipe times configured by raw timestamp (#12004)
0ad17bf4a97 is described below
commit 0ad17bf4a977b80719c1a30a62c1690ccd6b1336
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jan 30 20:30:05 2024 +0800
Pipe: Allow pipe times configured by raw timestamp (#12004)
---
.../apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java | 37 +++++++++++++++++++---
.../org/apache/iotdb/session/pool/SessionPool.java | 6 ++--
.../PipeHistoricalDataRegionTsFileExtractor.java | 19 +++++------
.../realtime/PipeRealtimeDataRegionExtractor.java | 9 +++---
.../org/apache/iotdb/db/utils/DateTimeUtils.java | 8 +++++
5 files changed, 55 insertions(+), 24 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
index 02db92b8cbe..b681a2a6bd1 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
@@ -228,7 +228,7 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualIT {
fail(e.getMessage());
}
- // Scenario 2.4: test when only 'source.end-time' is set
+ // Scenario 2.4: test when only when 'source.start-time' is less than
'source.end-time'
String p2_4 =
String.format(
"create pipe p2_4"
@@ -249,6 +249,32 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualIT {
}
assertPipeCount(9);
+
+ // ----------------------------------------------------------------------
//
+ // Scenario 3: when 'source.start-time' or 'source.end-time' is timestamp
//
+ // ----------------------------------------------------------------------
//
+
+ // Scenario 3.1: test when 'source.start-time' is timestamp string
+ String p3_1 =
+ String.format(
+ "create pipe p3_1"
+ + " with source ("
+ + "'source.start-time'='1000',"
+ + "'source.end-time'='2000.01.01T08:00:00')"
+ + " with connector ("
+ + "'connector'='iotdb-thrift-connector',"
+ + "'connector.ip'='%s',"
+ + "'connector.port'='%s',"
+ + "'connector.batch.enable'='false')",
+ receiverIp, receiverPort);
+ try (Connection connection = senderEnv.getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute(p3_1);
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ assertPipeCount(10);
}
@Test
@@ -273,8 +299,7 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualIT {
"%s", receiverIp, receiverPort);
List<String> invalidStartTimes =
- Arrays.asList(
- "''", "null", "'null'", "'1'", "'-1000-01-01T00:00:00'",
"'2000-01-01T00:00:0'");
+ Arrays.asList("''", "null", "'null'", "'-1000-01-01T00:00:00'",
"'2000-01-01T00:00:0'");
for (String invalidStartTime : invalidStartTimes) {
try (Connection connection = senderEnv.getConnection();
Statement statement = connection.createStatement()) {
@@ -639,7 +664,8 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualIT {
extractorAttributes.put("extractor.pattern", "root.db.d1");
extractorAttributes.put("extractor.history.enable", "true");
- extractorAttributes.put("extractor.history.start-time",
"1970-01-01T08:00:02+08:00");
+ // 1970-01-01T08:00:02+08:00
+ extractorAttributes.put("extractor.history.start-time", "2000");
extractorAttributes.put("extractor.history.end-time",
"1970-01-01T08:00:04+08:00");
connectorAttributes.put("connector", "iotdb-thrift-connector");
@@ -806,7 +832,8 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualIT {
extractorAttributes.put("source.pattern", "root.db.d1");
extractorAttributes.put("source.start-time",
"1970-01-01T08:00:02+08:00");
- extractorAttributes.put("source.end-time", "1970-01-01T08:00:04+08:00");
+ // 1970-01-01T08:00:04+08:00
+ extractorAttributes.put("source.end-time", "4000");
connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index aab6b118391..4d48e7106c3 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -1032,12 +1032,12 @@ public class SessionPool implements ISessionPool {
/**
* Insert aligned data in batch format, which can reduce the overhead of
network. This method is
- * just like jdbc batch insert, we pack some insert request in batch and
send them to server If
- * you want improve your performance, please see insertTablet method.
+ * just like jdbc batch insert, we pack some insert request in batch and
send them to server. If
+ * you want to improve your performance, please see insertTablet method.
*
* @see Session#insertTablet(Tablet)
*/
- @SuppressWarnings({"squid:S112"}) // ignore Generic exceptions should never
be throw
+ @SuppressWarnings({"squid:S112"}) // ignore Generic exceptions should never
be thrown
@Override
public void insertAlignedRecords(
List<String> multiSeriesIds,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 26ac4da4d3f..88c7fbcb6fd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -41,7 +41,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.time.ZoneId;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
@@ -105,13 +104,13 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
try {
historicalDataExtractionStartTime =
parameters.hasAnyAttributes(SOURCE_START_TIME_KEY)
- ? DateTimeUtils.convertDatetimeStrToLong(
- parameters.getStringByKeys(SOURCE_START_TIME_KEY),
ZoneId.systemDefault())
+ ?
DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(
+ parameters.getStringByKeys(SOURCE_START_TIME_KEY))
: Long.MIN_VALUE;
historicalDataExtractionEndTime =
parameters.hasAnyAttributes(SOURCE_END_TIME_KEY)
- ? DateTimeUtils.convertDatetimeStrToLong(
- parameters.getStringByKeys(SOURCE_END_TIME_KEY),
ZoneId.systemDefault())
+ ?
DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(
+ parameters.getStringByKeys(SOURCE_END_TIME_KEY))
: Long.MAX_VALUE;
if (historicalDataExtractionStartTime >
historicalDataExtractionEndTime) {
throw new PipeParameterNotValidException(
@@ -138,19 +137,17 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
isHistoricalExtractorEnabled
&& parameters.hasAnyAttributes(
EXTRACTOR_HISTORY_START_TIME_KEY,
SOURCE_HISTORY_START_TIME_KEY)
- ? DateTimeUtils.convertDatetimeStrToLong(
+ ?
DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(
parameters.getStringByKeys(
- EXTRACTOR_HISTORY_START_TIME_KEY,
SOURCE_HISTORY_START_TIME_KEY),
- ZoneId.systemDefault())
+ EXTRACTOR_HISTORY_START_TIME_KEY,
SOURCE_HISTORY_START_TIME_KEY))
: Long.MIN_VALUE;
historicalDataExtractionEndTime =
isHistoricalExtractorEnabled
&& parameters.hasAnyAttributes(
EXTRACTOR_HISTORY_END_TIME_KEY,
SOURCE_HISTORY_END_TIME_KEY)
- ? DateTimeUtils.convertDatetimeStrToLong(
+ ?
DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(
parameters.getStringByKeys(
- EXTRACTOR_HISTORY_END_TIME_KEY,
SOURCE_HISTORY_END_TIME_KEY),
- ZoneId.systemDefault())
+ EXTRACTOR_HISTORY_END_TIME_KEY,
SOURCE_HISTORY_END_TIME_KEY))
: Long.MAX_VALUE;
if (historicalDataExtractionStartTime > historicalDataExtractionEndTime)
{
throw new PipeParameterNotValidException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
index f12c24ee718..49fbaef5e50 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
@@ -45,7 +45,6 @@ import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -106,13 +105,13 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
try {
realtimeDataExtractionStartTime =
parameters.hasAnyAttributes(SOURCE_START_TIME_KEY)
- ? DateTimeUtils.convertDatetimeStrToLong(
- parameters.getStringByKeys(SOURCE_START_TIME_KEY),
ZoneId.systemDefault())
+ ?
DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(
+ parameters.getStringByKeys(SOURCE_START_TIME_KEY))
: Long.MIN_VALUE;
realtimeDataExtractionEndTime =
parameters.hasAnyAttributes(SOURCE_END_TIME_KEY)
- ? DateTimeUtils.convertDatetimeStrToLong(
- parameters.getStringByKeys(SOURCE_END_TIME_KEY),
ZoneId.systemDefault())
+ ?
DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(
+ parameters.getStringByKeys(SOURCE_END_TIME_KEY))
: Long.MAX_VALUE;
if (realtimeDataExtractionStartTime > realtimeDataExtractionEndTime) {
throw new PipeParameterNotValidException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java
index 6c48e26445e..6953c81d630 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java
@@ -451,6 +451,14 @@ public class DateTimeUtils {
.appendOptional(ISO_OFFSET_DATE_TIME_WITH_DOT_WITH_SPACE_NS)
.toFormatter();
+ public static long convertTimestampOrDatetimeStrToLongWithDefaultZone(String
timeStr) {
+ try {
+ return Long.parseLong(timeStr);
+ } catch (NumberFormatException e) {
+ return DateTimeUtils.convertDatetimeStrToLong(timeStr,
ZoneId.systemDefault());
+ }
+ }
+
public static long convertDatetimeStrToLong(String str, ZoneId zoneId) {
return convertDatetimeStrToLong(
str,