This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 7df4c3f [Fix] fix arrow read timestamp bug (#221)
7df4c3f is described below
commit 7df4c3f95093b4e033d3a09c05107236a5ab53aa
Author: Guangdong Liu <[email protected]>
AuthorDate: Mon Aug 5 09:52:06 2024 +0800
[Fix] fix arrow read timestamp bug (#221)
---
.../apache/doris/spark/serialization/RowBatch.java | 134 ++++++++++++++-------
.../doris/spark/serialization/TestRowBatch.java | 20 +--
2 files changed, 101 insertions(+), 53 deletions(-)
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
index feb9a4f..742d180 100644
---
a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
@@ -35,7 +35,7 @@ import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
-import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TimeStampVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.UInt4Vector;
import org.apache.arrow.vector.VarBinaryVector;
@@ -47,6 +47,7 @@ import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.complex.impl.UnionMapReader;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.spark.sql.types.Decimal;
import org.slf4j.Logger;
@@ -78,37 +79,24 @@ import java.util.Objects;
*/
public class RowBatch {
private static final Logger logger =
LoggerFactory.getLogger(RowBatch.class);
+ private static final ZoneId DEFAULT_ZONE_ID = ZoneId.systemDefault();
- private static final DateTimeFormatter DATE_TIME_FORMATTER = new
DateTimeFormatterBuilder()
- .appendPattern("yyyy-MM-dd HH:mm:ss")
- .appendFraction(ChronoField.MICRO_OF_SECOND, 0, 6, true)
- .toFormatter();
-
- public static class Row {
- private final List<Object> cols;
-
- Row(int colCount) {
- this.cols = new ArrayList<>(colCount);
- }
-
- List<Object> getCols() {
- return cols;
- }
-
- public void put(Object o) {
- cols.add(o);
- }
- }
-
+ private static final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
+ private static final String DATETIMEV2_PATTERN = "yyyy-MM-dd
HH:mm:ss.SSSSSS";
+ private final DateTimeFormatter dateTimeFormatter =
+ DateTimeFormatter.ofPattern(DATETIME_PATTERN);
+ private final DateTimeFormatter dateTimeV2Formatter =
+ DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN);
+ private final DateTimeFormatter dateFormatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd");
+ private final List<Row> rowBatch = new ArrayList<>();
+ private final ArrowStreamReader arrowStreamReader;
+ private final RootAllocator rootAllocator;
+ private final Schema schema;
// offset for iterate the rowBatch
private int offsetInRowBatch = 0;
private int rowCountInOneBatch = 0;
private int readRowCount = 0;
- private final List<Row> rowBatch = new ArrayList<>();
- private final ArrowStreamReader arrowStreamReader;
private List<FieldVector> fieldVectors;
- private final RootAllocator rootAllocator;
- private final Schema schema;
public RowBatch(TScanBatchResult nextResult, Schema schema) throws
DorisException {
this.schema = schema;
@@ -146,6 +134,19 @@ public class RowBatch {
}
}
+ public static LocalDateTime longToLocalDateTime(long time) {
+ Instant instant;
+ // Determine the timestamp accuracy and process it
+ if (time < 10_000_000_000L) { // Second timestamp
+ instant = Instant.ofEpochSecond(time);
+ } else if (time < 10_000_000_000_000L) { // milli second
+ instant = Instant.ofEpochMilli(time);
+ } else { // micro second
+ instant = Instant.ofEpochSecond(time / 1_000_000, (time %
1_000_000) * 1_000);
+ }
+ return LocalDateTime.ofInstant(instant, DEFAULT_ZONE_ID);
+ }
+
public boolean hasNext() {
if (offsetInRowBatch >= readRowCount) {
rowBatch.clear();
@@ -358,9 +359,13 @@ public class RowBatch {
break;
case "DATETIME":
case "DATETIMEV2":
-
Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR)
- ||
mt.equals(Types.MinorType.TIMESTAMPMICRO),
+
+ Preconditions.checkArgument(
+ mt.equals(Types.MinorType.TIMESTAMPMICRO) ||
mt.equals(MinorType.VARCHAR) ||
+ mt.equals(MinorType.TIMESTAMPMILLI) ||
mt.equals(MinorType.TIMESTAMPSEC),
typeMismatchMessage(currentType, mt));
+ typeMismatchMessage(currentType, mt);
+
if (mt.equals(Types.MinorType.VARCHAR)) {
VarCharVector varCharVector = (VarCharVector)
curFieldVector;
for (int rowIndex = 0; rowIndex <
rowCountInOneBatch; rowIndex++) {
@@ -369,28 +374,23 @@ public class RowBatch {
continue;
}
String value = new
String(varCharVector.get(rowIndex), StandardCharsets.UTF_8);
- addValueToRow(rowIndex, value);
+ value = completeMilliseconds(value);
+ LocalDateTime parse =
LocalDateTime.parse(value, dateTimeV2Formatter);
+ addValueToRow(rowIndex, parse);
}
- } else {
- TimeStampMicroVector vector =
(TimeStampMicroVector) curFieldVector;
+ } else if (curFieldVector instanceof TimeStampVector) {
+ TimeStampVector timeStampVector =
(TimeStampVector) curFieldVector;
+
for (int rowIndex = 0; rowIndex <
rowCountInOneBatch; rowIndex++) {
- if (vector.isNull(rowIndex)) {
+ if (timeStampVector.isNull(rowIndex)) {
+
addValueToRow(rowIndex, null);
continue;
}
- long time = vector.get(rowIndex);
- Instant instant;
- if (time / 10000000000L == 0) { // datetime(0)
- instant = Instant.ofEpochSecond(time);
- } else if (time / 10000000000000L == 0) { //
datetime(3)
- instant = Instant.ofEpochMilli(time);
- } else { // datetime(6)
- instant = Instant.ofEpochSecond(time /
1000000, time % 1000000 * 1000);
- }
- LocalDateTime dateTime =
LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
- String formatted =
DATE_TIME_FORMATTER.format(dateTime);
- addValueToRow(rowIndex, formatted);
+ LocalDateTime dateTime = getDateTime(rowIndex,
timeStampVector);
+ addValueToRow(rowIndex, dateTime);
}
+
}
break;
case "CHAR":
@@ -511,4 +511,50 @@ public class RowBatch {
// do nothing
}
}
+
+ public LocalDateTime getDateTime(int rowIndex, FieldVector fieldVector) {
+ TimeStampVector vector = (TimeStampVector) fieldVector;
+ if (vector.isNull(rowIndex)) {
+ return null;
+ }
+ // todo: Currently, the scale of doris's arrow datetimev2 is hardcoded
to 6,
+ // and there is also a time zone problem in arrow, so use timestamp to
convert first
+ long time = vector.get(rowIndex);
+ return longToLocalDateTime(time);
+ }
+
+ public static String completeMilliseconds(String stringValue) {
+ if (stringValue.length() == DATETIMEV2_PATTERN.length()) {
+ return stringValue;
+ }
+
+ if (stringValue.length() < DATETIME_PATTERN.length()) {
+ return stringValue;
+ }
+
+ StringBuilder sb = new StringBuilder(stringValue);
+ if (stringValue.length() == DATETIME_PATTERN.length()) {
+ sb.append(".");
+ }
+ while (sb.toString().length() < DATETIMEV2_PATTERN.length()) {
+ sb.append(0);
+ }
+ return sb.toString();
+ }
+
+ public static class Row {
+ private final List<Object> cols;
+
+ Row(int colCount) {
+ this.cols = new ArrayList<>(colCount);
+ }
+
+ List<Object> getCols() {
+ return cols;
+ }
+
+ public void put(Object o) {
+ cols.add(o);
+ }
+ }
}
diff --git
a/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java
b/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java
index 0c83050..0320cd8 100644
---
a/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java
+++
b/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java
@@ -273,7 +273,7 @@ public class TestRowBatch {
(float) 1.1,
(double) 1.1,
Date.valueOf("2008-08-08"),
- "2008-08-08 00:00:00",
+ LocalDateTime.of(2008, 8, 8, 0, 0, 0),
Decimal.apply(1234L, 4, 2),
"char1"
);
@@ -287,7 +287,7 @@ public class TestRowBatch {
(float) 2.2,
(double) 2.2,
Date.valueOf("1900-08-08"),
- "1900-08-08 00:00:00",
+ LocalDateTime.of(1900, 8, 8, 0, 0, 0),
Decimal.apply(8888L, 4, 2),
"char2"
);
@@ -301,7 +301,7 @@ public class TestRowBatch {
(float) 3.3,
(double) 3.3,
Date.valueOf("2100-08-08"),
- "2100-08-08 00:00:00",
+ LocalDateTime.of(2100, 8, 8, 0, 0, 0),
Decimal.apply(10L, 2, 0),
"char3"
);
@@ -832,16 +832,17 @@ public class TestRowBatch {
Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow0 = rowBatch.next();
- Assert.assertEquals("2024-03-20 00:00:00", actualRow0.get(0));
- Assert.assertEquals("2024-03-20 00:00:00", actualRow0.get(1));
+ Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 0),
actualRow0.get(0));
+ Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 0),
actualRow0.get(1));
List<Object> actualRow1 = rowBatch.next();
- Assert.assertEquals("2024-03-20 00:00:01", actualRow1.get(0));
- Assert.assertEquals("2024-03-20 00:00:00.123", actualRow1.get(1));
+ Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 1),
actualRow1.get(0));
+ Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 0, 123000000),
actualRow1.get(1));
List<Object> actualRow2 = rowBatch.next();
- Assert.assertEquals("2024-03-20 00:00:02", actualRow2.get(0));
- Assert.assertEquals("2024-03-20 00:00:00.123456", actualRow2.get(1));
+ Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 2),
actualRow2.get(0));
+ Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 0, 123456000),
actualRow2.get(1));
+
Assert.assertFalse(rowBatch.hasNext());
thrown.expect(NoSuchElementException.class);
@@ -906,6 +907,7 @@ public class TestRowBatch {
Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow0 = rowBatch.next();
+
Assert.assertEquals("{\"id\":\"a\"}", actualRow0.get(0));
List<Object> actualRow1 = rowBatch.next();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]