This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 2b595a78 [Fix](source) Fixed incorrect Map value reading when
key/value is of DATE or DATETIME type (#419)
2b595a78 is described below
commit 2b595a7819378650052a5beac4c0b707876b23ab
Author: bingquanzhao <[email protected]>
AuthorDate: Mon Jul 15 10:16:33 2024 +0800
[Fix](source) Fixed incorrect Map value reading when key/value is of DATE
or DATETIME type (#419)
---
.../apache/doris/flink/serialization/RowBatch.java | 37 ++++++++++++++++++----
.../doris/flink/serialization/TestRowBatch.java | 29 +++++++++++++++++
2 files changed, 59 insertions(+), 7 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index c7afe7f5..1a42b2b9 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -40,7 +40,10 @@ import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.complex.impl.DateDayReaderImpl;
+import org.apache.arrow.vector.complex.impl.TimeStampMicroReaderImpl;
import org.apache.arrow.vector.complex.impl.UnionMapReader;
+import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.types.Types;
import org.apache.doris.flink.exception.DorisException;
@@ -105,6 +108,7 @@ public class RowBatch {
private final DateTimeFormatter dateTimeV2Formatter =
DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN);
private final DateTimeFormatter dateFormatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd");
+ private static final ZoneId DEFAULT_ZONE_ID = ZoneId.systemDefault();
public List<Row> getRowBatch() {
return rowBatch;
@@ -454,7 +458,11 @@ public class RowBatch {
reader.setPosition(rowIndex);
Map<String, Object> mapValue = new HashMap<>();
while (reader.next()) {
- mapValue.put(reader.key().readObject().toString(),
reader.value().readObject());
+ FieldReader keyReader = reader.key();
+ FieldReader valueReader = reader.value();
+ Object mapKeyObj = handleMapFieldReader(keyReader);
+ Object mapValueObj = handleMapFieldReader(valueReader);
+ mapValue.put(mapKeyObj.toString(), mapValueObj);
}
addValueToRow(rowIndex, mapValue);
break;
@@ -478,6 +486,16 @@ public class RowBatch {
return true;
}
+ private Object handleMapFieldReader(FieldReader reader) {
+ if (reader instanceof TimeStampMicroReaderImpl) {
+ return longToLocalDateTime(reader.readLong());
+ }
+ if (reader instanceof DateDayReaderImpl) {
+ return LocalDate.ofEpochDay(((Integer)
reader.readObject()).longValue());
+ }
+ return reader.readObject();
+ }
+
@VisibleForTesting
public LocalDateTime getDateTime(int rowIndex, FieldVector fieldVector) {
TimeStampMicroVector vector = (TimeStampMicroVector) fieldVector;
@@ -485,16 +503,21 @@ public class RowBatch {
return null;
}
long time = vector.get(rowIndex);
+ return longToLocalDateTime(time);
+ }
+
+ @VisibleForTesting
+ public static LocalDateTime longToLocalDateTime(long time) {
Instant instant;
- if (time / 10000000000L == 0) { // datetime(0)
+ // Determine the timestamp accuracy and process it
+ if (time < 10_000_000_000L) { // Second timestamp
instant = Instant.ofEpochSecond(time);
- } else if (time / 10000000000000L == 0) { // datetime(3)
+ } else if (time < 10_000_000_000_000L) { // milli second
instant = Instant.ofEpochMilli(time);
- } else { // datetime(6)
- instant = Instant.ofEpochSecond(time / 1000000, time % 1000000 *
1000);
+ } else { // micro second
+ instant = Instant.ofEpochSecond(time / 1_000_000, (time %
1_000_000) * 1_000);
}
- LocalDateTime dateTime = LocalDateTime.ofInstant(instant,
ZoneId.systemDefault());
- return dateTime;
+ return LocalDateTime.ofInstant(instant, DEFAULT_ZONE_ID);
}
@VisibleForTesting
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
index 4824180c..e1f7b6e5 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
@@ -76,6 +76,7 @@ import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
+import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -1629,4 +1630,32 @@ public class TestRowBatch {
thrown.expectMessage(startsWith("Get row offset"));
rowBatch.addValueToRow(10, null);
}
+
+ @Test
+ public void longToLocalDateTimeTest() {
+ ZoneId defaultZoneId = ZoneId.systemDefault();
+ LocalDateTime now =
LocalDateTime.now(defaultZoneId).truncatedTo(ChronoUnit.MICROS);
+
+ long secondTimestamp =
now.toEpochSecond(defaultZoneId.getRules().getOffset(now));
+ long milliTimestamp =
now.atZone(defaultZoneId).toInstant().toEpochMilli();
+ long microTimestamp =
+
now.toInstant(defaultZoneId.getRules().getOffset(now)).getEpochSecond() *
1_000_000
+ + now.getNano() / 1_000;
+
+ LocalDateTime dateTime1 =
RowBatch.longToLocalDateTime(secondTimestamp);
+ LocalDateTime dateTime2 = RowBatch.longToLocalDateTime(milliTimestamp);
+ LocalDateTime dateTime3 = RowBatch.longToLocalDateTime(microTimestamp);
+
+ long result1 =
dateTime1.atZone(defaultZoneId).toInstant().getEpochSecond();
+ long result2 =
dateTime2.atZone(defaultZoneId).toInstant().toEpochMilli();
+ long result3 =
+
dateTime3.toInstant(defaultZoneId.getRules().getOffset(dateTime3)).getEpochSecond()
+ * 1_000_000
+ + dateTime3.getNano() / 1_000;
+
+ long[] expectArray = {secondTimestamp, milliTimestamp, microTimestamp};
+ long[] resultArray = {result1, result2, result3};
+
+ Assert.assertArrayEquals(expectArray, resultArray);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]