This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 7df4c3f  [Fix] fix arrow read timestamp bug (#221)
7df4c3f is described below

commit 7df4c3f95093b4e033d3a09c05107236a5ab53aa
Author: Guangdong Liu <[email protected]>
AuthorDate: Mon Aug 5 09:52:06 2024 +0800

    [Fix] fix arrow read timestamp bug (#221)
---
 .../apache/doris/spark/serialization/RowBatch.java | 134 ++++++++++++++-------
 .../doris/spark/serialization/TestRowBatch.java    |  20 +--
 2 files changed, 101 insertions(+), 53 deletions(-)

diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
index feb9a4f..742d180 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
@@ -35,7 +35,7 @@ import org.apache.arrow.vector.Float4Vector;
 import org.apache.arrow.vector.Float8Vector;
 import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.SmallIntVector;
-import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TimeStampVector;
 import org.apache.arrow.vector.TinyIntVector;
 import org.apache.arrow.vector.UInt4Vector;
 import org.apache.arrow.vector.VarBinaryVector;
@@ -47,6 +47,7 @@ import org.apache.arrow.vector.complex.StructVector;
 import org.apache.arrow.vector.complex.impl.UnionMapReader;
 import org.apache.arrow.vector.ipc.ArrowStreamReader;
 import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.spark.sql.types.Decimal;
 import org.slf4j.Logger;
@@ -78,37 +79,24 @@ import java.util.Objects;
  */
 public class RowBatch {
     private static final Logger logger = 
LoggerFactory.getLogger(RowBatch.class);
+    private static final ZoneId DEFAULT_ZONE_ID = ZoneId.systemDefault();
 
-    private static final DateTimeFormatter DATE_TIME_FORMATTER = new 
DateTimeFormatterBuilder()
-            .appendPattern("yyyy-MM-dd HH:mm:ss")
-            .appendFraction(ChronoField.MICRO_OF_SECOND, 0, 6, true)
-            .toFormatter();
-
-    public static class Row {
-        private final List<Object> cols;
-
-        Row(int colCount) {
-            this.cols = new ArrayList<>(colCount);
-        }
-
-        List<Object> getCols() {
-            return cols;
-        }
-
-        public void put(Object o) {
-            cols.add(o);
-        }
-    }
-
+    private static final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
+    private static final String DATETIMEV2_PATTERN = "yyyy-MM-dd 
HH:mm:ss.SSSSSS";
+    private final DateTimeFormatter dateTimeFormatter =
+            DateTimeFormatter.ofPattern(DATETIME_PATTERN);
+    private final DateTimeFormatter dateTimeV2Formatter =
+            DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN);
+    private final DateTimeFormatter dateFormatter = 
DateTimeFormatter.ofPattern("yyyy-MM-dd");
+    private final List<Row> rowBatch = new ArrayList<>();
+    private final ArrowStreamReader arrowStreamReader;
+    private final RootAllocator rootAllocator;
+    private final Schema schema;
     // offset for iterate the rowBatch
     private int offsetInRowBatch = 0;
     private int rowCountInOneBatch = 0;
     private int readRowCount = 0;
-    private final List<Row> rowBatch = new ArrayList<>();
-    private final ArrowStreamReader arrowStreamReader;
     private List<FieldVector> fieldVectors;
-    private final RootAllocator rootAllocator;
-    private final Schema schema;
 
     public RowBatch(TScanBatchResult nextResult, Schema schema) throws 
DorisException {
         this.schema = schema;
@@ -146,6 +134,19 @@ public class RowBatch {
         }
     }
 
+    public static LocalDateTime longToLocalDateTime(long time) {
+        Instant instant;
+        // Determine the timestamp accuracy and process it
+        if (time < 10_000_000_000L) { // Second timestamp
+            instant = Instant.ofEpochSecond(time);
+        } else if (time < 10_000_000_000_000L) { // milli second
+            instant = Instant.ofEpochMilli(time);
+        } else { // micro second
+            instant = Instant.ofEpochSecond(time / 1_000_000, (time % 
1_000_000) * 1_000);
+        }
+        return LocalDateTime.ofInstant(instant, DEFAULT_ZONE_ID);
+    }
+
     public boolean hasNext() {
         if (offsetInRowBatch >= readRowCount) {
             rowBatch.clear();
@@ -358,9 +359,13 @@ public class RowBatch {
                         break;
                     case "DATETIME":
                     case "DATETIMEV2":
-                        
Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR)
-                                        || 
mt.equals(Types.MinorType.TIMESTAMPMICRO),
+
+                        Preconditions.checkArgument(
+                                mt.equals(Types.MinorType.TIMESTAMPMICRO) || 
mt.equals(MinorType.VARCHAR) ||
+                                        mt.equals(MinorType.TIMESTAMPMILLI) || 
mt.equals(MinorType.TIMESTAMPSEC),
                                 typeMismatchMessage(currentType, mt));
+                        typeMismatchMessage(currentType, mt);
+
                         if (mt.equals(Types.MinorType.VARCHAR)) {
                             VarCharVector varCharVector = (VarCharVector) 
curFieldVector;
                             for (int rowIndex = 0; rowIndex < 
rowCountInOneBatch; rowIndex++) {
@@ -369,28 +374,23 @@ public class RowBatch {
                                     continue;
                                 }
                                 String value = new 
String(varCharVector.get(rowIndex), StandardCharsets.UTF_8);
-                                addValueToRow(rowIndex, value);
+                                 value = completeMilliseconds(value);
+                                LocalDateTime parse = 
LocalDateTime.parse(value, dateTimeV2Formatter);
+                                addValueToRow(rowIndex, parse);
                             }
-                        } else {
-                            TimeStampMicroVector vector = 
(TimeStampMicroVector) curFieldVector;
+                        } else if (curFieldVector instanceof TimeStampVector) {
+                            TimeStampVector timeStampVector = 
(TimeStampVector) curFieldVector;
+
                             for (int rowIndex = 0; rowIndex < 
rowCountInOneBatch; rowIndex++) {
-                                if (vector.isNull(rowIndex)) {
+                                if (timeStampVector.isNull(rowIndex)) {
+
                                     addValueToRow(rowIndex, null);
                                     continue;
                                 }
-                                long time = vector.get(rowIndex);
-                                Instant instant;
-                                if (time / 10000000000L == 0) { // datetime(0)
-                                    instant = Instant.ofEpochSecond(time);
-                                } else if (time / 10000000000000L == 0) { // 
datetime(3)
-                                    instant = Instant.ofEpochMilli(time);
-                                } else { // datetime(6)
-                                    instant = Instant.ofEpochSecond(time / 
1000000, time % 1000000 * 1000);
-                                }
-                                LocalDateTime dateTime = 
LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
-                                String formatted = 
DATE_TIME_FORMATTER.format(dateTime);
-                                addValueToRow(rowIndex, formatted);
+                                LocalDateTime dateTime = getDateTime(rowIndex, 
timeStampVector);
+                                addValueToRow(rowIndex, dateTime);
                             }
+
                         }
                         break;
                     case "CHAR":
@@ -511,4 +511,50 @@ public class RowBatch {
             // do nothing
         }
     }
+
+    public LocalDateTime getDateTime(int rowIndex, FieldVector fieldVector) {
+        TimeStampVector vector = (TimeStampVector) fieldVector;
+        if (vector.isNull(rowIndex)) {
+            return null;
+        }
+        // todo: Currently, the scale of doris's arrow datetimev2 is hardcoded 
to 6,
+        // and there is also a time zone problem in arrow, so use timestamp to 
convert first
+        long time = vector.get(rowIndex);
+        return longToLocalDateTime(time);
+    }
+
+    public static String completeMilliseconds(String stringValue) {
+        if (stringValue.length() == DATETIMEV2_PATTERN.length()) {
+            return stringValue;
+        }
+
+        if (stringValue.length() < DATETIME_PATTERN.length()) {
+            return stringValue;
+        }
+
+        StringBuilder sb = new StringBuilder(stringValue);
+        if (stringValue.length() == DATETIME_PATTERN.length()) {
+            sb.append(".");
+        }
+        while (sb.toString().length() < DATETIMEV2_PATTERN.length()) {
+            sb.append(0);
+        }
+        return sb.toString();
+    }
+
+    public static class Row {
+        private final List<Object> cols;
+
+        Row(int colCount) {
+            this.cols = new ArrayList<>(colCount);
+        }
+
+        List<Object> getCols() {
+            return cols;
+        }
+
+        public void put(Object o) {
+            cols.add(o);
+        }
+    }
 }
diff --git 
a/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java
 
b/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java
index 0c83050..0320cd8 100644
--- 
a/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java
+++ 
b/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java
@@ -273,7 +273,7 @@ public class TestRowBatch {
                 (float) 1.1,
                 (double) 1.1,
                 Date.valueOf("2008-08-08"),
-                "2008-08-08 00:00:00",
+                LocalDateTime.of(2008, 8, 8, 0, 0, 0),
                 Decimal.apply(1234L, 4, 2),
                 "char1"
         );
@@ -287,7 +287,7 @@ public class TestRowBatch {
                 (float) 2.2,
                 (double) 2.2,
                 Date.valueOf("1900-08-08"),
-                "1900-08-08 00:00:00",
+                LocalDateTime.of(1900, 8, 8, 0, 0, 0),
                 Decimal.apply(8888L, 4, 2),
                 "char2"
         );
@@ -301,7 +301,7 @@ public class TestRowBatch {
                 (float) 3.3,
                 (double) 3.3,
                 Date.valueOf("2100-08-08"),
-                "2100-08-08 00:00:00",
+                LocalDateTime.of(2100, 8, 8, 0, 0, 0),
                 Decimal.apply(10L, 2, 0),
                 "char3"
         );
@@ -832,16 +832,17 @@ public class TestRowBatch {
 
         Assert.assertTrue(rowBatch.hasNext());
         List<Object> actualRow0 = rowBatch.next();
-        Assert.assertEquals("2024-03-20 00:00:00", actualRow0.get(0));
-        Assert.assertEquals("2024-03-20 00:00:00", actualRow0.get(1));
+        Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 0), 
actualRow0.get(0));
+        Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 0), 
actualRow0.get(1));
 
         List<Object> actualRow1 = rowBatch.next();
-        Assert.assertEquals("2024-03-20 00:00:01", actualRow1.get(0));
-        Assert.assertEquals("2024-03-20 00:00:00.123", actualRow1.get(1));
+        Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 1), 
actualRow1.get(0));
+        Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 0, 123000000), 
actualRow1.get(1));
 
         List<Object> actualRow2 = rowBatch.next();
-        Assert.assertEquals("2024-03-20 00:00:02", actualRow2.get(0));
-        Assert.assertEquals("2024-03-20 00:00:00.123456", actualRow2.get(1));
+        Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 2), 
actualRow2.get(0));
+        Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 0, 123456000), 
actualRow2.get(1));
+
 
         Assert.assertFalse(rowBatch.hasNext());
         thrown.expect(NoSuchElementException.class);
@@ -906,6 +907,7 @@ public class TestRowBatch {
 
         Assert.assertTrue(rowBatch.hasNext());
         List<Object> actualRow0 = rowBatch.next();
+
         Assert.assertEquals("{\"id\":\"a\"}", actualRow0.get(0));
 
         List<Object> actualRow1 = rowBatch.next();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to