This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch 1.1.2-pick
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/1.1.2-pick by this push:
     new c9389f06 fix
c9389f06 is described below

commit c9389f06752c47150bf7a7a9e5760e61910a8c39
Author: wudi <[email protected]>
AuthorDate: Tue Feb 10 12:15:32 2026 +0800

    fix
---
 flink-doris-connector/pom.xml                      |   2 +-
 .../apache/doris/flink/serialization/RowBatch.java | 644 +++++++++++++++------
 .../org/apache/doris/flink/util/FastDateUtil.java  |  90 +++
 3 files changed, 554 insertions(+), 182 deletions(-)

diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index 0847d4a0..a9737ee2 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -27,7 +27,7 @@ under the License.
     </parent>
     <groupId>org.apache.doris</groupId>
     <artifactId>flink-doris-connector-1.14_2.12</artifactId>
-    <version>1.1.2</version>
+    <version>1.1.3</version>
     <name>Flink Doris Connector</name>
     <url>https://doris.apache.org/</url>
     <licenses>
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index a3c4d673..88db7bfb 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -18,44 +18,62 @@
 package org.apache.doris.flink.serialization;
 
 import org.apache.doris.flink.exception.DorisException;
+import org.apache.doris.flink.exception.DorisRuntimeException;
 import org.apache.doris.flink.rest.models.Schema;
+import org.apache.doris.flink.util.FastDateUtil;
 import org.apache.doris.sdk.thrift.TScanBatchResult;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.BigIntVector;
 import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.Decimal256Vector;
 import org.apache.arrow.vector.DecimalVector;
 import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
 import org.apache.arrow.vector.Float4Vector;
 import org.apache.arrow.vector.Float8Vector;
 import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeStampVector;
 import org.apache.arrow.vector.TinyIntVector;
 import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.complex.impl.DateDayReaderImpl;
+import org.apache.arrow.vector.complex.impl.TimeStampMicroReaderImpl;
+import org.apache.arrow.vector.complex.impl.UnionMapReader;
+import org.apache.arrow.vector.complex.reader.FieldReader;
+import org.apache.arrow.vector.ipc.ArrowReader;
 import org.apache.arrow.vector.ipc.ArrowStreamReader;
-import org.apache.arrow.vector.types.Types;
-import org.apache.flink.util.Preconditions;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
+import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 
-/**
- * row batch data container.
- */
+/** row batch data container. */
 public class RowBatch {
-    private static Logger logger = LoggerFactory.getLogger(RowBatch.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(RowBatch.class);
 
     public static class Row {
-        private List<Object> cols;
+        private final List<Object> cols;
 
         Row(int colCount) {
             this.cols = new ArrayList<>(colCount);
@@ -71,18 +89,24 @@ public class RowBatch {
     }
 
     // offset for iterate the rowBatch
-    private int offsetInRowBatch = 0;
+    private int offsetInRowBatch;
     private int rowCountInOneBatch = 0;
     private int readRowCount = 0;
-    private List<Row> rowBatch = new ArrayList<>();
-    private final ArrowStreamReader arrowStreamReader;
+    private final List<Row> rowBatch = new ArrayList<>();
+    private final ArrowReader arrowStreamReader;
     private VectorSchemaRoot root;
     private List<FieldVector> fieldVectors;
     private RootAllocator rootAllocator;
     private final Schema schema;
-
-    private final DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
-    private final DateTimeFormatter dateFormatter = 
DateTimeFormatter.ofPattern("yyyy-MM-dd");
+    private static final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
+    private static final String DATETIMEV2_PATTERN = "yyyy-MM-dd 
HH:mm:ss.SSSSSS";
+    private static final String DATE_PATTERN = "yyyy-MM-dd";
+    private final DateTimeFormatter dateTimeFormatter =
+            DateTimeFormatter.ofPattern(DATETIME_PATTERN);
+    private final DateTimeFormatter dateTimeV2Formatter =
+            DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN);
+    private final DateTimeFormatter dateFormatter = 
DateTimeFormatter.ofPattern(DATE_PATTERN);
+    private static final ZoneId DEFAULT_ZONE_ID = ZoneId.systemDefault();
 
     public List<Row> getRowBatch() {
         return rowBatch;
@@ -91,24 +115,63 @@ public class RowBatch {
     public RowBatch(TScanBatchResult nextResult, Schema schema) {
         this.schema = schema;
         this.rootAllocator = new RootAllocator(Integer.MAX_VALUE);
-        this.arrowStreamReader = new ArrowStreamReader(
-                new ByteArrayInputStream(nextResult.getRows()),
-                rootAllocator
-        );
+        this.arrowStreamReader =
+                new ArrowStreamReader(
+                        new ByteArrayInputStream(nextResult.getRows()), 
rootAllocator);
+        this.offsetInRowBatch = 0;
+    }
+
+    public RowBatch(ArrowReader nextResult, Schema schema) {
+        this.schema = schema;
+        this.arrowStreamReader = nextResult;
         this.offsetInRowBatch = 0;
     }
 
-    public RowBatch readArrow() throws DorisException {
+    public RowBatch readFlightArrow() {
+        try {
+            this.root = arrowStreamReader.getVectorSchemaRoot();
+            fieldVectors = root.getFieldVectors();
+            if (fieldVectors.size() > schema.size()) {
+                logger.error(
+                        "Schema size '{}' is not equal to arrow field size 
'{}'.",
+                        fieldVectors.size(),
+                        schema.size());
+                throw new DorisException(
+                        "Load Doris data failed, schema size of fetch data is 
wrong.");
+            }
+            if (fieldVectors.isEmpty() || root.getRowCount() == 0) {
+                logger.debug("One batch in arrow has no data.");
+                return null;
+            }
+            rowCountInOneBatch = root.getRowCount();
+            for (int i = 0; i < rowCountInOneBatch; ++i) {
+                rowBatch.add(new RowBatch.Row(fieldVectors.size()));
+            }
+            convertArrowToRowBatch();
+            readRowCount += root.getRowCount();
+            return this;
+        } catch (DorisException e) {
+            logger.error("Read Doris Data failed because: ", e);
+            throw new DorisRuntimeException(e.getMessage());
+        } catch (IOException e) {
+            return this;
+        }
+    }
+
+    public RowBatch readArrow() {
         try {
             this.root = arrowStreamReader.getVectorSchemaRoot();
             while (arrowStreamReader.loadNextBatch()) {
                 fieldVectors = root.getFieldVectors();
                 if (fieldVectors.size() > schema.size()) {
-                    logger.error("Schema size '{}' is not equal to arrow field 
size '{}'.",
-                            fieldVectors.size(), schema.size());
-                    throw new DorisException("Load Doris data failed, schema 
size of fetch data is wrong.");
+                    logger.error(
+                            "Data schema size '{}' should not be bigger than 
arrow field size '{}'",
+                            schema.size(),
+                            fieldVectors.size());
+                    throw new DorisException(
+                            "Load Doris data failed, schema size of fetch data 
is wrong.");
                 }
-                if (fieldVectors.size() == 0 || root.getRowCount() == 0) {
+                if (fieldVectors.isEmpty() || root.getRowCount() == 0) {
                     logger.debug("One batch in arrow has no data.");
                     continue;
                 }
@@ -123,23 +186,21 @@ public class RowBatch {
             return this;
         } catch (Exception e) {
             logger.error("Read Doris Data failed because: ", e);
-            throw new DorisException(e.getMessage());
+            throw new DorisRuntimeException(e.getMessage());
         } finally {
             close();
         }
     }
 
     public boolean hasNext() {
-        if (offsetInRowBatch < readRowCount) {
-            return true;
-        }
-        return false;
+        return offsetInRowBatch < readRowCount;
     }
 
-    private void addValueToRow(int rowIndex, Object obj) {
+    @VisibleForTesting
+    public void addValueToRow(int rowIndex, Object obj) {
         if (rowIndex > rowCountInOneBatch) {
-            String errMsg = "Get row offset: " + rowIndex + " larger than row 
size: " +
-                    rowCountInOneBatch;
+            String errMsg =
+                    "Get row offset: " + rowIndex + " larger than row size: " 
+ rowCountInOneBatch;
             logger.error(errMsg);
             throw new NoSuchElementException(errMsg);
         }
@@ -149,176 +210,397 @@ public class RowBatch {
     public void convertArrowToRowBatch() throws DorisException {
         try {
             for (int col = 0; col < fieldVectors.size(); col++) {
-                FieldVector curFieldVector = fieldVectors.get(col);
-                Types.MinorType mt = curFieldVector.getMinorType();
-
+                FieldVector fieldVector = fieldVectors.get(col);
+                MinorType minorType = fieldVector.getMinorType();
                 final String currentType = schema.get(col).getType();
-                switch (currentType) {
-                    case "NULL_TYPE":
-                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            addValueToRow(rowIndex, null);
-                        }
-                        break;
-                    case "BOOLEAN":
-                        
Preconditions.checkArgument(mt.equals(Types.MinorType.BIT),
-                                typeMismatchMessage(currentType, mt));
-                        BitVector bitVector = (BitVector) curFieldVector;
-                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            Object fieldValue = bitVector.isNull(rowIndex) ? 
null : bitVector.get(rowIndex) != 0;
-                            addValueToRow(rowIndex, fieldValue);
-                        }
-                        break;
-                    case "TINYINT":
-                        
Preconditions.checkArgument(mt.equals(Types.MinorType.TINYINT),
-                                typeMismatchMessage(currentType, mt));
-                        TinyIntVector tinyIntVector = (TinyIntVector) 
curFieldVector;
-                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            Object fieldValue = tinyIntVector.isNull(rowIndex) 
? null : tinyIntVector.get(rowIndex);
-                            addValueToRow(rowIndex, fieldValue);
-                        }
-                        break;
-                    case "SMALLINT":
-                        
Preconditions.checkArgument(mt.equals(Types.MinorType.SMALLINT),
-                                typeMismatchMessage(currentType, mt));
-                        SmallIntVector smallIntVector = (SmallIntVector) 
curFieldVector;
-                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            Object fieldValue = 
smallIntVector.isNull(rowIndex) ? null : smallIntVector.get(rowIndex);
-                            addValueToRow(rowIndex, fieldValue);
-                        }
-                        break;
-                    case "INT":
-                        
Preconditions.checkArgument(mt.equals(Types.MinorType.INT),
-                                typeMismatchMessage(currentType, mt));
-                        IntVector intVector = (IntVector) curFieldVector;
-                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            Object fieldValue = intVector.isNull(rowIndex) ? 
null : intVector.get(rowIndex);
-                            addValueToRow(rowIndex, fieldValue);
-                        }
-                        break;
-                    case "BIGINT":
+                final String colName = schema.get(col).getName();
+                for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
+                    boolean passed = doConvert(col, rowIndex, minorType, 
currentType, fieldVector);
+                    if (!passed) {
+                        throw new IllegalArgumentException(
+                                "FLINK type is "
+                                        + currentType
+                                        + ", but arrow type is "
+                                        + minorType.name()
+                                        + ", column Name is "
+                                        + colName);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            close();
+            throw e;
+        }
+    }
 
-                        
Preconditions.checkArgument(mt.equals(Types.MinorType.BIGINT),
-                                typeMismatchMessage(currentType, mt));
-                        BigIntVector bigIntVector = (BigIntVector) 
curFieldVector;
-                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            Object fieldValue = bigIntVector.isNull(rowIndex) 
? null : bigIntVector.get(rowIndex);
-                            addValueToRow(rowIndex, fieldValue);
-                        }
-                        break;
-                    case "FLOAT":
-                        
Preconditions.checkArgument(mt.equals(Types.MinorType.FLOAT4),
-                                typeMismatchMessage(currentType, mt));
-                        Float4Vector float4Vector = (Float4Vector) 
curFieldVector;
-                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            Object fieldValue = float4Vector.isNull(rowIndex) 
? null : float4Vector.get(rowIndex);
-                            addValueToRow(rowIndex, fieldValue);
-                        }
-                        break;
-                    case "TIME":
-                    case "DOUBLE":
-                        
Preconditions.checkArgument(mt.equals(Types.MinorType.FLOAT8),
-                                typeMismatchMessage(currentType, mt));
-                        Float8Vector float8Vector = (Float8Vector) 
curFieldVector;
-                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            Object fieldValue = float8Vector.isNull(rowIndex) 
? null : float8Vector.get(rowIndex);
-                            addValueToRow(rowIndex, fieldValue);
-                        }
+    @VisibleForTesting
+    public boolean doConvert(
+            int col, int rowIndex, MinorType minorType, String currentType, 
FieldVector fieldVector)
+            throws DorisException {
+        switch (currentType) {
+            case "NULL_TYPE":
+                break;
+            case "BOOLEAN":
+                if (!minorType.equals(MinorType.BIT)) {
+                    return false;
+                }
+                BitVector bitVector = (BitVector) fieldVector;
+                Object fieldValue =
+                        bitVector.isNull(rowIndex) ? null : 
bitVector.get(rowIndex) != 0;
+                addValueToRow(rowIndex, fieldValue);
+                break;
+            case "TINYINT":
+                if (!minorType.equals(MinorType.TINYINT)) {
+                    return false;
+                }
+                TinyIntVector tinyIntVector = (TinyIntVector) fieldVector;
+                fieldValue = tinyIntVector.isNull(rowIndex) ? null : 
tinyIntVector.get(rowIndex);
+                addValueToRow(rowIndex, fieldValue);
+                break;
+            case "SMALLINT":
+                if (!minorType.equals(MinorType.SMALLINT)) {
+                    return false;
+                }
+                SmallIntVector smallIntVector = (SmallIntVector) fieldVector;
+                fieldValue = smallIntVector.isNull(rowIndex) ? null : 
smallIntVector.get(rowIndex);
+                addValueToRow(rowIndex, fieldValue);
+                break;
+            case "INT":
+                if (!minorType.equals(MinorType.INT)) {
+                    return false;
+                }
+                IntVector intVector = (IntVector) fieldVector;
+                fieldValue = intVector.isNull(rowIndex) ? null : 
intVector.get(rowIndex);
+                addValueToRow(rowIndex, fieldValue);
+                break;
+            case "BIGINT":
+                if (!minorType.equals(MinorType.BIGINT)) {
+                    return false;
+                }
+                BigIntVector bigIntVector = (BigIntVector) fieldVector;
+                fieldValue = bigIntVector.isNull(rowIndex) ? null : 
bigIntVector.get(rowIndex);
+                addValueToRow(rowIndex, fieldValue);
+                break;
+            case "FLOAT":
+                if (!minorType.equals(MinorType.FLOAT4)) {
+                    return false;
+                }
+                Float4Vector float4Vector = (Float4Vector) fieldVector;
+                fieldValue = float4Vector.isNull(rowIndex) ? null : 
float4Vector.get(rowIndex);
+                addValueToRow(rowIndex, fieldValue);
+                break;
+            case "TIME":
+            case "DOUBLE":
+                if (!minorType.equals(MinorType.FLOAT8)) {
+                    return false;
+                }
+                Float8Vector float8Vector = (Float8Vector) fieldVector;
+                fieldValue = float8Vector.isNull(rowIndex) ? null : 
float8Vector.get(rowIndex);
+                addValueToRow(rowIndex, fieldValue);
+                break;
+            case "BINARY":
+                if (!minorType.equals(MinorType.VARBINARY)) {
+                    return false;
+                }
+                VarBinaryVector varBinaryVector = (VarBinaryVector) 
fieldVector;
+                fieldValue =
+                        varBinaryVector.isNull(rowIndex) ? null : 
varBinaryVector.get(rowIndex);
+                addValueToRow(rowIndex, fieldValue);
+                break;
+            case "DECIMAL":
+            case "DECIMALV2":
+            case "DECIMAL32":
+            case "DECIMAL64":
+            case "DECIMAL128I":
+            case "DECIMAL128":
+                if (!minorType.equals(MinorType.DECIMAL)) {
+                    return false;
+                }
+                DecimalVector decimalVector = (DecimalVector) fieldVector;
+                if (decimalVector.isNull(rowIndex)) {
+                    addValueToRow(rowIndex, null);
+                    break;
+                }
+                BigDecimal value = 
decimalVector.getObject(rowIndex).stripTrailingZeros();
+                addValueToRow(rowIndex, value);
+                break;
+            case "DECIMAL256":
+                if (!minorType.equals(MinorType.DECIMAL256)) {
+                    return false;
+                }
+                Decimal256Vector decimal256Vector = (Decimal256Vector) 
fieldVector;
+                if (decimal256Vector.isNull(rowIndex)) {
+                    addValueToRow(rowIndex, null);
+                    break;
+                }
+                BigDecimal value256 = 
decimal256Vector.getObject(rowIndex).stripTrailingZeros();
+                addValueToRow(rowIndex, value256);
+                break;
+            case "DATE":
+            case "DATEV2":
+                if (!minorType.equals(MinorType.DATEDAY) && 
!minorType.equals(MinorType.VARCHAR)) {
+                    return false;
+                }
+                if (minorType.equals(MinorType.VARCHAR)) {
+                    VarCharVector date = (VarCharVector) fieldVector;
+                    if (date.isNull(rowIndex)) {
+                        addValueToRow(rowIndex, null);
                         break;
-                    case "BINARY":
-                        
Preconditions.checkArgument(mt.equals(Types.MinorType.VARBINARY),
-                                typeMismatchMessage(currentType, mt));
-                        VarBinaryVector varBinaryVector = (VarBinaryVector) 
curFieldVector;
-                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            Object fieldValue = 
varBinaryVector.isNull(rowIndex) ? null : varBinaryVector.get(rowIndex);
-                            addValueToRow(rowIndex, fieldValue);
-                        }
+                    }
+                    String stringValue = new String(date.get(rowIndex), 
StandardCharsets.UTF_8);
+                    LocalDate localDate = 
FastDateUtil.fastParseDate(stringValue, DATE_PATTERN);
+                    addValueToRow(rowIndex, localDate);
+                } else {
+                    DateDayVector date = (DateDayVector) fieldVector;
+                    if (date.isNull(rowIndex)) {
+                        addValueToRow(rowIndex, null);
                         break;
-                    case "DECIMAL":
-                    case "DECIMALV2":
-                    case "DECIMAL32":
-                    case "DECIMAL64":
-                    case "DECIMAL128I":
-                    case "DECIMAL128":
-                        
Preconditions.checkArgument(mt.equals(Types.MinorType.DECIMAL),
-                                typeMismatchMessage(currentType, mt));
-                        DecimalVector decimalVector = (DecimalVector) 
curFieldVector;
-                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            if (decimalVector.isNull(rowIndex)) {
-                                addValueToRow(rowIndex, null);
-                                continue;
-                            }
-                            BigDecimal value = 
decimalVector.getObject(rowIndex).stripTrailingZeros();
-                            addValueToRow(rowIndex, value);
-                        }
+                    }
+                    LocalDate localDate = 
LocalDate.ofEpochDay(date.get(rowIndex));
+                    addValueToRow(rowIndex, localDate);
+                }
+                break;
+            case "DATETIME":
+                if (minorType.equals(MinorType.VARCHAR)) {
+                    VarCharVector varCharVector = (VarCharVector) fieldVector;
+                    if (varCharVector.isNull(rowIndex)) {
+                        addValueToRow(rowIndex, null);
                         break;
-                    case "DATE":
-                        
Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR),
-                                typeMismatchMessage(currentType, mt));
-                        VarCharVector date = (VarCharVector) curFieldVector;
-                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            if (date.isNull(rowIndex)) {
-                                addValueToRow(rowIndex, null);
-                                continue;
-                            }
-                            String value = new String(date.get(rowIndex));
-                            LocalDate localDate = LocalDate.parse(value, 
dateFormatter);
-                            addValueToRow(rowIndex, localDate);
-                        }
+                    }
+                    String stringValue =
+                            new String(varCharVector.get(rowIndex), 
StandardCharsets.UTF_8);
+                    stringValue = completeMilliseconds(stringValue);
+                    LocalDateTime parse =
+                            FastDateUtil.fastParseDateTime(stringValue, 
DATETIME_PATTERN);
+                    addValueToRow(rowIndex, parse);
+                } else if (fieldVector instanceof TimeStampVector) {
+                    LocalDateTime dateTime = getDateTime(rowIndex, 
fieldVector);
+                    addValueToRow(rowIndex, dateTime);
+                } else {
+                    logger.error(
+                            "Unsupported type for DATETIME, minorType {}, 
class is {}",
+                            minorType.name(),
+                            fieldVector == null ? null : 
fieldVector.getClass());
+                    return false;
+                }
+                break;
+            case "DATETIMEV2":
+                if (minorType.equals(MinorType.VARCHAR)) {
+                    VarCharVector varCharVector = (VarCharVector) fieldVector;
+                    if (varCharVector.isNull(rowIndex)) {
+                        addValueToRow(rowIndex, null);
                         break;
-                    case "DATETIME":
-                        
Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR),
-                                typeMismatchMessage(currentType, mt));
-                        VarCharVector timeStampSecVector = (VarCharVector) 
curFieldVector;
-                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            if (timeStampSecVector.isNull(rowIndex)) {
-                                addValueToRow(rowIndex, null);
-                                continue;
-                            }
-                            String value = new 
String(timeStampSecVector.get(rowIndex));
-                            LocalDateTime parse = LocalDateTime.parse(value, 
dateTimeFormatter);
-                            addValueToRow(rowIndex, parse);
-                        }
+                    }
+                    String stringValue =
+                            new String(varCharVector.get(rowIndex), 
StandardCharsets.UTF_8);
+                    stringValue = completeMilliseconds(stringValue);
+                    LocalDateTime parse =
+                            FastDateUtil.fastParseDateTimeV2(stringValue, 
DATETIMEV2_PATTERN);
+                    addValueToRow(rowIndex, parse);
+                } else if (fieldVector instanceof TimeStampVector) {
+                    LocalDateTime dateTime = getDateTime(rowIndex, 
fieldVector);
+                    addValueToRow(rowIndex, dateTime);
+                } else {
+                    logger.error(
+                            "Unsupported type for DATETIMEV2, minorType {}, 
class is {}",
+                            minorType.name(),
+                            fieldVector == null ? null : 
fieldVector.getClass());
+                    return false;
+                }
+                break;
+            case "LARGEINT":
+                if (!minorType.equals(MinorType.FIXEDSIZEBINARY)
+                        && !minorType.equals(MinorType.VARCHAR)) {
+                    return false;
+                }
+                if (minorType.equals(MinorType.FIXEDSIZEBINARY)) {
+                    FixedSizeBinaryVector largeIntVector = 
(FixedSizeBinaryVector) fieldVector;
+                    if (largeIntVector.isNull(rowIndex)) {
+                        addValueToRow(rowIndex, null);
                         break;
-                    case "LARGEINT":
-                    case "CHAR":
-                    case "VARCHAR":
-                    case "STRING":
-                        
Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR),
-                                typeMismatchMessage(currentType, mt));
-                        VarCharVector varCharVector = (VarCharVector) 
curFieldVector;
-                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            if (varCharVector.isNull(rowIndex)) {
-                                addValueToRow(rowIndex, null);
-                                continue;
-                            }
-                            String value = new 
String(varCharVector.get(rowIndex));
-                            addValueToRow(rowIndex, value);
-                        }
+                    }
+                    byte[] bytes = largeIntVector.get(rowIndex);
+                    int left = 0, right = bytes.length - 1;
+                    while (left < right) {
+                        byte temp = bytes[left];
+                        bytes[left] = bytes[right];
+                        bytes[right] = temp;
+                        left++;
+                        right--;
+                    }
+                    BigInteger largeInt = new BigInteger(bytes);
+                    addValueToRow(rowIndex, largeInt);
+                    break;
+                } else {
+                    VarCharVector largeIntVector = (VarCharVector) fieldVector;
+                    if (largeIntVector.isNull(rowIndex)) {
+                        addValueToRow(rowIndex, null);
                         break;
-                    default:
-                        String errMsg = "Unsupported type " + 
schema.get(col).getType();
-                        logger.error(errMsg);
-                        throw new DorisException(errMsg);
+                    }
+                    String stringValue =
+                            new String(largeIntVector.get(rowIndex), 
StandardCharsets.UTF_8);
+                    BigInteger largeInt = new BigInteger(stringValue);
+                    addValueToRow(rowIndex, largeInt);
+                    break;
                 }
-            }
-        } catch (Exception e) {
-            close();
-            throw e;
+            case "CHAR":
+            case "VARCHAR":
+            case "STRING":
+            case "JSONB":
+            case "JSON":
+            case "VARIANT":
+                if (!minorType.equals(MinorType.VARCHAR)) {
+                    return false;
+                }
+                VarCharVector varCharVector = (VarCharVector) fieldVector;
+                if (varCharVector.isNull(rowIndex)) {
+                    addValueToRow(rowIndex, null);
+                    break;
+                }
+                String stringValue =
+                        new String(varCharVector.get(rowIndex), 
StandardCharsets.UTF_8);
+                addValueToRow(rowIndex, stringValue);
+                break;
+            case "ARRAY":
+                if (!minorType.equals(MinorType.LIST)) {
+                    return false;
+                }
+                ListVector listVector = (ListVector) fieldVector;
+                Object listValue =
+                        listVector.isNull(rowIndex) ? null : 
listVector.getObject(rowIndex);
+                // todo: when the subtype of array is date, conversion is 
required
+                addValueToRow(rowIndex, listValue);
+                break;
+            case "MAP":
+                if (!minorType.equals(MinorType.MAP)) {
+                    return false;
+                }
+                MapVector mapVector = (MapVector) fieldVector;
+                UnionMapReader reader = mapVector.getReader();
+                if (mapVector.isNull(rowIndex)) {
+                    addValueToRow(rowIndex, null);
+                    break;
+                }
+                reader.setPosition(rowIndex);
+                Map<String, Object> mapValue = new HashMap<>();
+                while (reader.next()) {
+                    FieldReader keyReader = reader.key();
+                    FieldReader valueReader = reader.value();
+                    Object mapKeyObj = handleMapFieldReader(keyReader);
+                    Object mapValueObj = handleMapFieldReader(valueReader);
+                    mapValue.put(mapKeyObj.toString(), mapValueObj);
+                }
+                addValueToRow(rowIndex, mapValue);
+                break;
+            case "STRUCT":
+                if (!minorType.equals(MinorType.STRUCT)) {
+                    return false;
+                }
+                StructVector structVector = (StructVector) fieldVector;
+                if (structVector.isNull(rowIndex)) {
+                    addValueToRow(rowIndex, null);
+                    break;
+                }
+                Map<String, ?> structValue = structVector.getObject(rowIndex);
+                addValueToRow(rowIndex, structValue);
+                break;
+            default:
+                String errMsg = "Unsupported type " + 
schema.get(col).getType();
+                logger.error(errMsg);
+                throw new DorisException(errMsg);
+        }
+        return true;
+    }
+
+    private Object handleMapFieldReader(FieldReader reader) {
+        if (reader instanceof TimeStampMicroReaderImpl) {
+            return longToLocalDateTime(reader.readLong());
+        }
+        if (reader instanceof DateDayReaderImpl) {
+            return LocalDate.ofEpochDay(((Integer) 
reader.readObject()).longValue());
+        }
+        return reader.readObject();
+    }
+
+    @VisibleForTesting
+    public LocalDateTime getDateTime(int rowIndex, FieldVector fieldVector) {
+        TimeStampVector vector = (TimeStampVector) fieldVector;
+        if (vector.isNull(rowIndex)) {
+            return null;
+        }
+        // todo: Currently, the scale of doris's arrow datetimev2 is hardcoded 
to 6,
+        // and there is also a time zone problem in arrow, so use timestamp to 
convert first
+        long time = vector.get(rowIndex);
+        return longToLocalDateTime(time);
+    }
+
+    @VisibleForTesting
+    public static LocalDateTime longToLocalDateTime(long time) {
+        Instant instant;
+        // Determine the timestamp accuracy and process it
+        if (time < 10_000_000_000L) { // Second timestamp
+            instant = Instant.ofEpochSecond(time);
+        } else if (time < 10_000_000_000_000L) { // milli second
+            instant = Instant.ofEpochMilli(time);
+        } else { // micro second
+            instant = Instant.ofEpochSecond(time / 1_000_000, (time % 
1_000_000) * 1_000);
+        }
+        return LocalDateTime.ofInstant(instant, DEFAULT_ZONE_ID);
+    }
+
+    /**
+     * use case when to replace while 
"Benchmark","Mode","Threads","Samples","Score","Score Error.
+     * (99.9%)","Unit" "CaseWhenTest", "thrpt", 1, 5, 40657433.897696, 
2515802.067503,"ops/s"
+     * "WhileTest", "thrpt", 1, 5, 9708130.819491, 1207453.635429,"ops/s"
+     *
+     * @param stringValue
+     * @return
+     */
+    @VisibleForTesting
+    public static String completeMilliseconds(String stringValue) {
+        if (stringValue.length() == DATETIMEV2_PATTERN.length()) {
+            return stringValue;
+        }
+
+        if (stringValue.length() < DATETIME_PATTERN.length()) {
+            return stringValue;
+        }
+
+        if (stringValue.length() == DATETIME_PATTERN.length()) {
+            stringValue += ".";
+        }
+        int s = DATETIMEV2_PATTERN.length() - stringValue.length();
+        switch (s) {
+            case 1:
+                return stringValue + "0";
+            case 2:
+                return stringValue + "00";
+            case 3:
+                return stringValue + "000";
+            case 4:
+                return stringValue + "0000";
+            case 5:
+                return stringValue + "00000";
+            case 6:
+                return stringValue + "000000";
+            default:
+                return stringValue;
         }
     }
 
-    public List<Object> next() throws DorisException {
+    public List<Object> next() {
         if (!hasNext()) {
-            String errMsg = "Get row offset:" + offsetInRowBatch + " larger 
than row size: " + readRowCount;
+            String errMsg =
+                    "Get row offset:" + offsetInRowBatch + " larger than row 
size: " + readRowCount;
             logger.error(errMsg);
             throw new NoSuchElementException(errMsg);
         }
         return rowBatch.get(offsetInRowBatch++).getCols();
     }
 
-    private String typeMismatchMessage(final String sparkType, final 
Types.MinorType arrowType) {
+    private String typeMismatchMessage(final String flinkType, final MinorType 
arrowType) {
         final String messageTemplate = "FLINK type is %1$s, but arrow type is 
%2$s.";
-        return String.format(messageTemplate, sparkType, arrowType.name());
+        return String.format(messageTemplate, flinkType, arrowType.name());
     }
 
     public int getReadRowCount() {
@@ -337,4 +619,4 @@ public class RowBatch {
             // do nothing
         }
     }
-}
+}
\ No newline at end of file
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/util/FastDateUtil.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/util/FastDateUtil.java
new file mode 100644
index 00000000..e1e7fd0d
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/util/FastDateUtil.java
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.util;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+
+/**
+ * idea for this util is from https://bugs.openjdk.org/browse/JDK-8144808 
991ms.
+ * LocalDateTime.parse(...) 246ms : LocalDateTime.of(...)
+ */
+public final class FastDateUtil {
+
+    public static LocalDateTime fastParseDateTimeV2(String dateTime, String 
pattern) {
+        char[] arr = dateTime.toCharArray();
+        int[] indexes =
+                new int[] {
+                        pattern.indexOf("yyyy"),
+                        pattern.indexOf("MM"),
+                        pattern.indexOf("dd"),
+                        pattern.indexOf("HH"),
+                        pattern.indexOf("mm"),
+                        pattern.indexOf("ss"),
+                        pattern.indexOf("SSSSSS")
+                };
+        int year = parseFromIndex(arr, indexes[0], indexes[0] + 4);
+        int month = parseFromIndex(arr, indexes[1], indexes[1] + 2);
+        int day = parseFromIndex(arr, indexes[2], indexes[2] + 2);
+        int hour = parseFromIndex(arr, indexes[3], indexes[3] + 2);
+        int minute = parseFromIndex(arr, indexes[4], indexes[4] + 2);
+        int second = parseFromIndex(arr, indexes[5], indexes[5] + 2);
+        int nanos = parseFromIndex(arr, indexes[6], indexes[6] + 6) * 1000;
+        return LocalDateTime.of(year, month, day, hour, minute, second, nanos);
+    }
+
+    public static LocalDateTime fastParseDateTime(String dateTime, String 
pattern) {
+        char[] arr = dateTime.toCharArray();
+        int[] indexes =
+                new int[] {
+                        pattern.indexOf("yyyy"),
+                        pattern.indexOf("MM"),
+                        pattern.indexOf("dd"),
+                        pattern.indexOf("HH"),
+                        pattern.indexOf("mm"),
+                        pattern.indexOf("ss")
+                };
+        int year = parseFromIndex(arr, indexes[0], indexes[0] + 4);
+        int month = parseFromIndex(arr, indexes[1], indexes[1] + 2);
+        int day = parseFromIndex(arr, indexes[2], indexes[2] + 2);
+        int hour = parseFromIndex(arr, indexes[3], indexes[3] + 2);
+        int minute = parseFromIndex(arr, indexes[4], indexes[4] + 2);
+        int second = parseFromIndex(arr, indexes[5], indexes[5] + 2);
+        return LocalDateTime.of(year, month, day, hour, minute, second);
+    }
+
+    public static LocalDate fastParseDate(String dateTime, String pattern) {
+        char[] arr = dateTime.toCharArray();
+        int[] indexes =
+                new int[] {
+                        pattern.indexOf("yyyy"), pattern.indexOf("MM"), 
pattern.indexOf("dd"),
+                };
+        int year = parseFromIndex(arr, indexes[0], indexes[0] + 4);
+        int month = parseFromIndex(arr, indexes[1], indexes[1] + 2);
+        int day = parseFromIndex(arr, indexes[2], indexes[2] + 2);
+        return LocalDate.of(year, month, day);
+    }
+
+    private static int parseFromIndex(char[] arr, int start, int end) {
+        int value = 0;
+        for (int i = start; i < end; i++) {
+            value = value * 10 + (arr[i] - '0');
+        }
+        return value;
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to