This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 02254b9c0e [Fix][Connector-V2] Fixed missing timestamp accuracy of
starrocks connector (#9096)
02254b9c0e is described below
commit 02254b9c0eb3df4871400ad160f5f2095cfee3ab
Author: corgy-w <[email protected]>
AuthorDate: Wed Apr 2 17:20:44 2025 +0800
[Fix][Connector-V2] Fixed missing timestamp accuracy of starrocks connector
(#9096)
---
.../serialize/StarRocksBaseSerializer.java | 20 +++++++++------
.../serialize/StarRocksJsonSerializerTest.java | 29 +++++++++++++++++-----
2 files changed, 35 insertions(+), 14 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksBaseSerializer.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksBaseSerializer.java
index 4321cb46c8..04b5b4ec21 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksBaseSerializer.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksBaseSerializer.java
@@ -19,25 +19,29 @@ package
org.apache.seatunnel.connectors.seatunnel.starrocks.serialize;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
-import lombok.Builder;
-
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
public class StarRocksBaseSerializer {
- @Builder.Default private DateUtils.Formatter dateFormatter =
DateUtils.Formatter.YYYY_MM_DD;
+ private DateUtils.Formatter dateFormatter = DateUtils.Formatter.YYYY_MM_DD;
- @Builder.Default
- private DateTimeUtils.Formatter dateTimeFormatter =
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
+ private DateTimeFormatter dateTimeFormatter =
+ new DateTimeFormatterBuilder()
+ .appendPattern("yyyy-MM-dd HH:mm:ss")
+ .optionalStart()
+ .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
+ .toFormatter();
- @Builder.Default private TimeUtils.Formatter timeFormatter =
TimeUtils.Formatter.HH_MM_SS;
+ private TimeUtils.Formatter timeFormatter = TimeUtils.Formatter.HH_MM_SS;
protected Object convert(SeaTunnelDataType dataType, Object val) {
if (val == null) {
@@ -59,7 +63,7 @@ public class StarRocksBaseSerializer {
case TIME:
return TimeUtils.toString((LocalTime) val, timeFormatter);
case TIMESTAMP:
- return DateTimeUtils.toString((LocalDateTime) val,
dateTimeFormatter);
+ return ((LocalDateTime) val).format(dateTimeFormatter);
case ARRAY:
case MAP:
return JsonUtils.toJsonString(val);
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializerTest.java
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializerTest.java
index 6e0d947644..54c4401b8d 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializerTest.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializerTest.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.starrocks.serialize;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -27,30 +28,46 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
import java.util.Collections;
public class StarRocksJsonSerializerTest {
+ private DateTimeFormatter dateTimeFormatter =
+ new DateTimeFormatterBuilder()
+ .appendPattern("yyyy-MM-dd HH:mm:ss")
+ .optionalStart()
+ .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
+ .toFormatter();
+
@Test
public void serialize() {
- String[] filedNames = {"id", "name", "array", "map"};
- SeaTunnelDataType<?>[] filedTypes = {
+ String[] fieldNames = {"id", "name", "array", "map", "timestamp"};
+ SeaTunnelDataType<?>[] fieldTypes = {
BasicType.LONG_TYPE,
BasicType.STRING_TYPE,
ArrayType.STRING_ARRAY_TYPE,
- new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE)
+ new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE),
+ LocalTimeType.LOCAL_DATE_TIME_TYPE
};
- SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType(filedNames,
filedTypes);
+ SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType(fieldNames,
fieldTypes);
StarRocksJsonSerializer starRocksJsonSerializer =
new StarRocksJsonSerializer(seaTunnelRowType, false);
Object[] fields = {
- 1, "Tom", new String[] {"tag1", "tag2"},
Collections.singletonMap("key1", "value1")
+ 1,
+ "Tom",
+ new String[] {"tag1", "tag2"},
+ Collections.singletonMap("key1", "value1"),
+ LocalDateTime.parse("2024-01-25 07:55:45.123", dateTimeFormatter)
};
SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
String jsonString = starRocksJsonSerializer.serialize(seaTunnelRow);
Assertions.assertEquals(
-
"{\"id\":1,\"name\":\"Tom\",\"array\":[\"tag1\",\"tag2\"],\"map\":{\"key1\":\"value1\"}}",
+
"{\"id\":1,\"name\":\"Tom\",\"array\":[\"tag1\",\"tag2\"],\"map\":{\"key1\":\"value1\"},\"timestamp\":\"2024-01-25
07:55:45.123\"}",
jsonString);
}
}