This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 27bf18e3 [Fix] fix arrow  read timestamp bug (#446)
27bf18e3 is described below

commit 27bf18e39c6f47ba01bbc34adf66795c2c9aedc1
Author: wudi <[email protected]>
AuthorDate: Fri Aug 2 11:16:57 2024 +0800

    [Fix] fix arrow  read timestamp bug (#446)
---
 .../apache/doris/flink/serialization/RowBatch.java |  30 +++---
 .../doris/flink/serialization/TestRowBatch.java    | 109 +++++++++++++++++++++
 2 files changed, 127 insertions(+), 12 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index 1a42b2b9..38c63b77 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -31,7 +31,7 @@ import org.apache.arrow.vector.Float4Vector;
 import org.apache.arrow.vector.Float8Vector;
 import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.SmallIntVector;
-import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TimeStampVector;
 import org.apache.arrow.vector.TinyIntVector;
 import org.apache.arrow.vector.UInt4Vector;
 import org.apache.arrow.vector.VarBinaryVector;
@@ -334,10 +334,6 @@ public class RowBatch {
                 }
                 break;
             case "DATETIME":
-                if (!minorType.equals(Types.MinorType.TIMESTAMPMICRO)
-                        && !minorType.equals(Types.MinorType.VARCHAR)) {
-                    return false;
-                }
                 if (minorType.equals(Types.MinorType.VARCHAR)) {
                     VarCharVector varCharVector = (VarCharVector) fieldVector;
                     if (varCharVector.isNull(rowIndex)) {
@@ -347,16 +343,18 @@ public class RowBatch {
                     String stringValue = new 
String(varCharVector.get(rowIndex));
                     LocalDateTime parse = LocalDateTime.parse(stringValue, 
dateTimeFormatter);
                     addValueToRow(rowIndex, parse);
-                } else {
+                } else if (fieldVector instanceof TimeStampVector) {
                     LocalDateTime dateTime = getDateTime(rowIndex, 
fieldVector);
                     addValueToRow(rowIndex, dateTime);
+                } else {
+                    logger.error(
+                            "Unsupported type for DATETIME, minorType {}, 
class is {}",
+                            minorType.name(),
+                            fieldVector == null ? null : 
fieldVector.getClass());
+                    return false;
                 }
                 break;
             case "DATETIMEV2":
-                if (!minorType.equals(Types.MinorType.TIMESTAMPMICRO)
-                        && !minorType.equals(Types.MinorType.VARCHAR)) {
-                    return false;
-                }
                 if (minorType.equals(Types.MinorType.VARCHAR)) {
                     VarCharVector varCharVector = (VarCharVector) fieldVector;
                     if (varCharVector.isNull(rowIndex)) {
@@ -367,9 +365,15 @@ public class RowBatch {
                     stringValue = completeMilliseconds(stringValue);
                     LocalDateTime parse = LocalDateTime.parse(stringValue, 
dateTimeV2Formatter);
                     addValueToRow(rowIndex, parse);
-                } else {
+                } else if (fieldVector instanceof TimeStampVector) {
                     LocalDateTime dateTime = getDateTime(rowIndex, 
fieldVector);
                     addValueToRow(rowIndex, dateTime);
+                } else {
+                    logger.error(
+                            "Unsupported type for DATETIMEV2, minorType {}, 
class is {}",
+                            minorType.name(),
+                            fieldVector == null ? null : 
fieldVector.getClass());
+                    return false;
                 }
                 break;
             case "LARGEINT":
@@ -498,10 +502,12 @@ public class RowBatch {
 
     @VisibleForTesting
     public LocalDateTime getDateTime(int rowIndex, FieldVector fieldVector) {
-        TimeStampMicroVector vector = (TimeStampMicroVector) fieldVector;
+        TimeStampVector vector = (TimeStampVector) fieldVector;
         if (vector.isNull(rowIndex)) {
             return null;
         }
+        // todo: Currently, the scale of doris's arrow datetimev2 is hardcoded 
to 6,
+        // and there is also a time zone problem in arrow, so use timestamp to 
convert first
         long time = vector.get(rowIndex);
         return longToLocalDateTime(time);
     }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
index e1f7b6e5..3d9ac261 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
@@ -34,6 +34,8 @@ import org.apache.arrow.vector.Float8Vector;
 import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.SmallIntVector;
 import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TimeStampMilliVector;
+import org.apache.arrow.vector.TimeStampSecVector;
 import org.apache.arrow.vector.TinyIntVector;
 import org.apache.arrow.vector.UInt4Vector;
 import org.apache.arrow.vector.VarBinaryVector;
@@ -1331,9 +1333,23 @@ public class TestRowBatch {
         flag = rowBatch.doConvert(1, 1, Types.MinorType.INT, "DATETIME", null);
         Assert.assertFalse(flag);
 
+        flag = rowBatch.doConvert(1, 1, Types.MinorType.TIMESTAMPSEC, 
"DATETIME", null);
+        Assert.assertFalse(flag);
+
+        IntVector intVector1 = new IntVector("test", new 
RootAllocator(Integer.MAX_VALUE));
+        intVector1.setNull(0);
+        flag = rowBatch.doConvert(1, 1, Types.MinorType.TIMESTAMPSEC, 
"DATETIME", intVector1);
+        Assert.assertFalse(flag);
+
         flag = rowBatch.doConvert(1, 1, Types.MinorType.INT, "DATETIMEV2", 
null);
         Assert.assertFalse(flag);
 
+        flag = rowBatch.doConvert(1, 1, Types.MinorType.TIMESTAMPSEC, 
"DATETIMEV2", null);
+        Assert.assertFalse(flag);
+
+        flag = rowBatch.doConvert(1, 1, Types.MinorType.TIMESTAMPSEC, 
"DATETIMEV2", intVector1);
+        Assert.assertFalse(flag);
+
         flag = rowBatch.doConvert(1, 1, Types.MinorType.INT, "LARGEINT", null);
         Assert.assertFalse(flag);
 
@@ -1658,4 +1674,97 @@ public class TestRowBatch {
 
         Assert.assertArrayEquals(expectArray, resultArray);
     }
+
+    @Test
+    public void timestampVector() throws IOException, DorisException {
+        List<Field> childrenBuilder = new ArrayList<>();
+        childrenBuilder.add(
+                new Field(
+                        "k0",
+                        FieldType.nullable(new 
ArrowType.Timestamp(TimeUnit.MICROSECOND, null)),
+                        null));
+        childrenBuilder.add(
+                new Field(
+                        "k1",
+                        FieldType.nullable(new 
ArrowType.Timestamp(TimeUnit.MILLISECOND, null)),
+                        null));
+        childrenBuilder.add(
+                new Field(
+                        "k2",
+                        FieldType.nullable(new 
ArrowType.Timestamp(TimeUnit.SECOND, null)),
+                        null));
+
+        VectorSchemaRoot root =
+                VectorSchemaRoot.create(
+                        new 
org.apache.arrow.vector.types.pojo.Schema(childrenBuilder, null),
+                        new RootAllocator(Integer.MAX_VALUE));
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        ArrowStreamWriter arrowStreamWriter =
+                new ArrowStreamWriter(
+                        root, new DictionaryProvider.MapDictionaryProvider(), 
outputStream);
+
+        arrowStreamWriter.start();
+        root.setRowCount(1);
+
+        FieldVector vector = root.getVector("k0");
+        TimeStampMicroVector mircoVec = (TimeStampMicroVector) vector;
+        mircoVec.allocateNew(1);
+        mircoVec.setIndexDefined(0);
+        mircoVec.setSafe(0, 1721892143586123L);
+        vector.setValueCount(1);
+
+        vector = root.getVector("k1");
+        TimeStampMilliVector milliVector = (TimeStampMilliVector) vector;
+        milliVector.allocateNew(1);
+        milliVector.setIndexDefined(0);
+        milliVector.setSafe(0, 1721892143586L);
+        vector.setValueCount(1);
+
+        vector = root.getVector("k2");
+        TimeStampSecVector secVector = (TimeStampSecVector) vector;
+        secVector.allocateNew(1);
+        secVector.setIndexDefined(0);
+        secVector.setSafe(0, 1721892143L);
+        vector.setValueCount(1);
+
+        arrowStreamWriter.writeBatch();
+
+        arrowStreamWriter.end();
+        arrowStreamWriter.close();
+
+        TStatus status = new TStatus();
+        status.setStatusCode(TStatusCode.OK);
+        TScanBatchResult scanBatchResult = new TScanBatchResult();
+        scanBatchResult.setStatus(status);
+        scanBatchResult.setEos(false);
+        scanBatchResult.setRows(outputStream.toByteArray());
+
+        String schemaStr =
+                
"{\"properties\":[{\"type\":\"DATETIME\",\"name\":\"k0\",\"comment\":\"\"}, 
{\"type\":\"DATETIME\",\"name\":\"k1\",\"comment\":\"\"}, 
{\"type\":\"DATETIME\",\"name\":\"k2\",\"comment\":\"\"}],"
+                        + "\"status\":200}";
+
+        Schema schema = RestService.parseSchema(schemaStr, logger);
+
+        RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow();
+        List<Object> next = rowBatch.next();
+        Assert.assertEquals(next.size(), 3);
+        Assert.assertEquals(
+                next.get(0),
+                LocalDateTime.of(2024, 7, 25, 15, 22, 23, 586123000)
+                        .atZone(ZoneId.of("UTC+8"))
+                        .withZoneSameInstant(ZoneId.systemDefault())
+                        .toLocalDateTime());
+        Assert.assertEquals(
+                next.get(1),
+                LocalDateTime.of(2024, 7, 25, 15, 22, 23, 586000000)
+                        .atZone(ZoneId.of("UTC+8"))
+                        .withZoneSameInstant(ZoneId.systemDefault())
+                        .toLocalDateTime());
+        Assert.assertEquals(
+                next.get(2),
+                LocalDateTime.of(2024, 7, 25, 15, 22, 23, 0)
+                        .atZone(ZoneId.of("UTC+8"))
+                        .withZoneSameInstant(ZoneId.systemDefault())
+                        .toLocalDateTime());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to