This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new eb90905b [Feature] support reader doris using  arrow flight driver 
(#465)
eb90905b is described below

commit eb90905b82a6e46aeb8b9f0ddea151bcd64554c7
Author: xiayang <[email protected]>
AuthorDate: Mon Aug 12 15:53:33 2024 +0800

    [Feature] support reader doris using  arrow flight driver (#465)
---
 flink-doris-connector/pom.xml                      |  21 ++-
 .../doris/flink/cfg/ConfigurationOptions.java      |   6 +
 .../apache/doris/flink/cfg/DorisReadOptions.java   |  42 ++++-
 .../apache/doris/flink/cfg/DorisStreamOptions.java |  12 ++
 .../org/apache/doris/flink/rest/RestService.java   |  32 ++++
 .../org/apache/doris/flink/rest/SchemaUtils.java   |  24 +++
 .../apache/doris/flink/serialization/RowBatch.java | 102 ++++++++++--
 .../source/reader/DorisFlightValueReader.java      | 182 +++++++++++++++++++++
 .../source/reader/DorisSourceSplitReader.java      |  14 +-
 .../flink/source/reader/DorisValueReader.java      |   2 +-
 .../doris/flink/source/reader/ValueReader.java     |  54 ++++++
 .../flink/source/split/DorisSplitRecords.java      |   8 +-
 .../doris/flink/table/DorisConfigOptions.java      |  11 +-
 .../flink/table/DorisDynamicTableFactory.java      |   9 +-
 .../org/apache/doris/flink/util/FastDateUtil.java  |  90 ++++++++++
 .../apache/doris/flink/rest/SchemaUtilsTest.java   |  58 +++++++
 .../flink/table/DorisDynamicTableFactoryTest.java  |  11 +-
 .../apache/doris/flink/utils/FastDateUtilTest.java |  53 ++++++
 18 files changed, 687 insertions(+), 44 deletions(-)

diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index 2d7b2875..4bf34d68 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -73,7 +73,6 @@ under the License.
         <flink.sql.cdc.version>3.1.1</flink.sql.cdc.version>
         <flink.python.id>flink-python</flink.python.id>
         <libthrift.version>0.16.0</libthrift.version>
-        <arrow.version>13.0.0</arrow.version>
         <maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version>
         <maven-javadoc-plugin.version>3.3.0</maven-javadoc-plugin.version>
         <maven-source-plugin.version>3.2.1</maven-source-plugin.version>
@@ -95,6 +94,8 @@ under the License.
         <jsqlparser.version>4.9</jsqlparser.version>
         <mysql.driver.version>8.0.26</mysql.driver.version>
         <ojdbc.version>19.3.0.0</ojdbc.version>
+        <arrow.version>15.0.2</arrow.version>
+        <adbc.version>0.12.0</adbc.version>
     </properties>
 
     <dependencies>
@@ -179,13 +180,16 @@ under the License.
             <artifactId>commons-codec</artifactId>
             <version>${commons-codec.version}</version>
         </dependency>
-
+        <dependency>
+            <groupId>org.apache.arrow.adbc</groupId>
+            <artifactId>adbc-driver-flight-sql</artifactId>
+            <version>${adbc.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.arrow</groupId>
             <artifactId>arrow-vector</artifactId>
             <version>${arrow.version}</version>
         </dependency>
-
         <dependency>
             <groupId>org.apache.arrow</groupId>
             <artifactId>arrow-memory-netty</artifactId>
@@ -206,7 +210,6 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
-
         <!--  jackson  -->
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
@@ -410,13 +413,13 @@ under the License.
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-shade-plugin</artifactId>
-                    <version>3.2.4</version>
+                    <version>3.4.1</version>
                     <configuration>
                         <relocations>
-                            <relocation>
-                                <pattern>org.apache.arrow</pattern>
-                                
<shadedPattern>org.apache.doris.shaded.org.apache.arrow</shadedPattern>
-                            </relocation>
+<!--                            <relocation>-->
+<!--                                <pattern>org.apache.arrow</pattern>-->
+<!--                                
<shadedPattern>org.apache.doris.shaded.org.apache.arrow</shadedPattern>-->
+<!--                            </relocation>-->
                             <relocation>
                                 <pattern>io.netty</pattern>
                                 
<shadedPattern>org.apache.doris.shaded.io.netty</shadedPattern>
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
index 4a3f70b8..c249c251 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
@@ -51,4 +51,10 @@ public interface ConfigurationOptions {
     Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false;
     String DORIS_DESERIALIZE_QUEUE_SIZE = "doris.deserialize.queue.size";
     Integer DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64;
+
+    String USE_FLIGHT_SQL = "source.use-flight-sql";
+    Boolean USE_FLIGHT_SQL_DEFAULT = false;
+
+    String FLIGHT_SQL_PORT = "source.flight-sql-port";
+    Integer FLIGHT_SQL_PORT_DEFAULT = 9040;
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
index 3669e740..2f6cd8a8 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
@@ -37,6 +37,8 @@ public class DorisReadOptions implements Serializable {
     private Integer deserializeQueueSize;
     private Boolean deserializeArrowAsync;
     private boolean useOldApi;
+    private boolean useFlightSql;
+    private Integer flightSqlPort;
 
     public DorisReadOptions(
             String readFields,
@@ -50,7 +52,9 @@ public class DorisReadOptions implements Serializable {
             Long execMemLimit,
             Integer deserializeQueueSize,
             Boolean deserializeArrowAsync,
-            boolean useOldApi) {
+            boolean useOldApi,
+            boolean useFlightSql,
+            Integer flightSqlPort) {
         this.readFields = readFields;
         this.filterQuery = filterQuery;
         this.requestTabletSize = requestTabletSize;
@@ -63,6 +67,8 @@ public class DorisReadOptions implements Serializable {
         this.deserializeQueueSize = deserializeQueueSize;
         this.deserializeArrowAsync = deserializeArrowAsync;
         this.useOldApi = useOldApi;
+        this.useFlightSql = useFlightSql;
+        this.flightSqlPort = flightSqlPort;
     }
 
     public String getReadFields() {
@@ -121,6 +127,14 @@ public class DorisReadOptions implements Serializable {
         this.filterQuery = filterQuery;
     }
 
+    public boolean getUseFlightSql() {
+        return useFlightSql;
+    }
+
+    public Integer getFlightSqlPort() {
+        return flightSqlPort;
+    }
+
     public static Builder builder() {
         return new Builder();
     }
@@ -149,7 +163,9 @@ public class DorisReadOptions implements Serializable {
                 && Objects.equals(requestBatchSize, that.requestBatchSize)
                 && Objects.equals(execMemLimit, that.execMemLimit)
                 && Objects.equals(deserializeQueueSize, 
that.deserializeQueueSize)
-                && Objects.equals(deserializeArrowAsync, 
that.deserializeArrowAsync);
+                && Objects.equals(deserializeArrowAsync, 
that.deserializeArrowAsync)
+                && Objects.equals(useFlightSql, that.useFlightSql)
+                && Objects.equals(flightSqlPort, that.flightSqlPort);
     }
 
     @Override
@@ -166,7 +182,9 @@ public class DorisReadOptions implements Serializable {
                 execMemLimit,
                 deserializeQueueSize,
                 deserializeArrowAsync,
-                useOldApi);
+                useOldApi,
+                useFlightSql,
+                flightSqlPort);
     }
 
     /** Builder of {@link DorisReadOptions}. */
@@ -184,6 +202,8 @@ public class DorisReadOptions implements Serializable {
         private Integer deserializeQueueSize;
         private Boolean deserializeArrowAsync;
         private Boolean useOldApi = false;
+        private Boolean useFlightSql = false;
+        private Integer flightSqlPort;
 
         public Builder setReadFields(String readFields) {
             this.readFields = readFields;
@@ -240,11 +260,21 @@ public class DorisReadOptions implements Serializable {
             return this;
         }
 
-        public Builder setUseOldApi(boolean useOldApi) {
+        public Builder setUseFlightSql(Boolean useFlightSql) {
+            this.useFlightSql = useFlightSql;
+            return this;
+        }
+
+        public Builder setUseOldApi(Boolean useOldApi) {
             this.useOldApi = useOldApi;
             return this;
         }
 
+        public Builder setFlightSqlPort(Integer flightSqlPort) {
+            this.flightSqlPort = flightSqlPort;
+            return this;
+        }
+
         public DorisReadOptions build() {
             return new DorisReadOptions(
                     readFields,
@@ -258,7 +288,9 @@ public class DorisReadOptions implements Serializable {
                     execMemLimit,
                     deserializeQueueSize,
                     deserializeArrowAsync,
-                    useOldApi);
+                    useOldApi,
+                    useFlightSql,
+                    flightSqlPort);
         }
     }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java
index 89e9182e..e6d75969 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java
@@ -106,6 +106,18 @@ public class DorisStreamOptions implements Serializable {
                                         prop.getProperty(
                                                 
ConfigurationOptions.DORIS_TABLET_SIZE,
                                                 
ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT
+                                                        .toString())))
+                        .setUseFlightSql(
+                                Boolean.valueOf(
+                                        prop.getProperty(
+                                                
ConfigurationOptions.USE_FLIGHT_SQL,
+                                                
ConfigurationOptions.USE_FLIGHT_SQL_DEFAULT
+                                                        .toString())))
+                        .setFlightSqlPort(
+                                Integer.valueOf(
+                                        prop.getProperty(
+                                                
ConfigurationOptions.FLIGHT_SQL_PORT,
+                                                
ConfigurationOptions.FLIGHT_SQL_PORT_DEFAULT
                                                         .toString())));
 
         this.options = optionsBuilder.build();
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index 1dbb1fde..1663d4b3 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -234,6 +234,38 @@ public class RestService implements Serializable {
         }
     }
 
+    @VisibleForTesting
+    public static String parseFlightSql(
+            DorisReadOptions readOptions,
+            DorisOptions options,
+            PartitionDefinition partition,
+            Logger logger)
+            throws IllegalArgumentException {
+        String[] tableIdentifiers = 
parseIdentifier(options.getTableIdentifier(), logger);
+        String readFields =
+                StringUtils.isBlank(readOptions.getReadFields())
+                        ? "*"
+                        : readOptions.getReadFields();
+        String sql =
+                "select "
+                        + readFields
+                        + " from `"
+                        + tableIdentifiers[0]
+                        + "`.`"
+                        + tableIdentifiers[1]
+                        + "`";
+        String tablet =
+                partition.getTabletIds().stream()
+                        .map(Object::toString)
+                        .collect(Collectors.joining(","));
+        sql += "  TABLET(" + tablet + ") ";
+        if (!StringUtils.isEmpty(readOptions.getFilterQuery())) {
+            sql += " where " + readOptions.getFilterQuery();
+        }
+        logger.info("Query SQL Sending to Doris FE is: '{}'.", sql);
+        return sql;
+    }
+
     /**
      * parse table identifier to array.
      *
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java
index f6594b5b..9f7d7fc6 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java
@@ -22,6 +22,9 @@ import org.apache.doris.flink.rest.models.Schema;
 import org.apache.doris.sdk.thrift.TScanColumnDesc;
 
 import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 public class SchemaUtils {
 
@@ -46,4 +49,25 @@ public class SchemaUtils {
                                                 "")));
         return schema;
     }
+
+    public static Schema convertToSchema(
+            Schema tableSchema, org.apache.arrow.vector.types.pojo.Schema 
tscanColumnDescs) {
+        Schema schema = new Schema(tscanColumnDescs.getFields().size());
+        Map<String, Field> collect =
+                tableSchema.getProperties().stream()
+                        .collect(Collectors.toMap(Field::getName, 
Function.identity()));
+        tscanColumnDescs
+                .getFields()
+                .forEach(
+                        desc ->
+                                schema.put(
+                                        new Field(
+                                                desc.getName(),
+                                                
collect.get(desc.getName()).getType(),
+                                                "",
+                                                0,
+                                                0,
+                                                "")));
+        return schema;
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index 38c63b77..dee9c1fc 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -44,11 +44,13 @@ import 
org.apache.arrow.vector.complex.impl.DateDayReaderImpl;
 import org.apache.arrow.vector.complex.impl.TimeStampMicroReaderImpl;
 import org.apache.arrow.vector.complex.impl.UnionMapReader;
 import org.apache.arrow.vector.complex.reader.FieldReader;
+import org.apache.arrow.vector.ipc.ArrowReader;
 import org.apache.arrow.vector.ipc.ArrowStreamReader;
 import org.apache.arrow.vector.types.Types;
 import org.apache.doris.flink.exception.DorisException;
 import org.apache.doris.flink.exception.DorisRuntimeException;
 import org.apache.doris.flink.rest.models.Schema;
+import org.apache.doris.flink.util.FastDateUtil;
 import org.apache.doris.flink.util.IPUtils;
 import org.apache.doris.sdk.thrift.TScanBatchResult;
 import org.slf4j.Logger;
@@ -58,6 +60,7 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
 import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
@@ -96,18 +99,19 @@ public class RowBatch {
     private int rowCountInOneBatch = 0;
     private int readRowCount = 0;
     private final List<Row> rowBatch = new ArrayList<>();
-    private final ArrowStreamReader arrowStreamReader;
+    private final ArrowReader arrowStreamReader;
     private VectorSchemaRoot root;
     private List<FieldVector> fieldVectors;
     private RootAllocator rootAllocator;
     private final Schema schema;
     private static final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
     private static final String DATETIMEV2_PATTERN = "yyyy-MM-dd 
HH:mm:ss.SSSSSS";
+    private static final String DATE_PATTERN = "yyyy-MM-dd";
     private final DateTimeFormatter dateTimeFormatter =
             DateTimeFormatter.ofPattern(DATETIME_PATTERN);
     private final DateTimeFormatter dateTimeV2Formatter =
             DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN);
-    private final DateTimeFormatter dateFormatter = 
DateTimeFormatter.ofPattern("yyyy-MM-dd");
+    private final DateTimeFormatter dateFormatter = 
DateTimeFormatter.ofPattern(DATE_PATTERN);
     private static final ZoneId DEFAULT_ZONE_ID = ZoneId.systemDefault();
 
     public List<Row> getRowBatch() {
@@ -123,6 +127,43 @@ public class RowBatch {
         this.offsetInRowBatch = 0;
     }
 
+    public RowBatch(ArrowReader nextResult, Schema schema) {
+        this.schema = schema;
+        this.arrowStreamReader = nextResult;
+        this.offsetInRowBatch = 0;
+    }
+
+    public RowBatch readFlightArrow() {
+        try {
+            this.root = arrowStreamReader.getVectorSchemaRoot();
+            fieldVectors = root.getFieldVectors();
+            if (fieldVectors.size() > schema.size()) {
+                logger.error(
+                        "Schema size '{}' is not equal to arrow field size 
'{}'.",
+                        fieldVectors.size(),
+                        schema.size());
+                throw new DorisException(
+                        "Load Doris data failed, schema size of fetch data is 
wrong.");
+            }
+            if (fieldVectors.isEmpty() || root.getRowCount() == 0) {
+                logger.debug("One batch in arrow has no data.");
+                return null;
+            }
+            rowCountInOneBatch = root.getRowCount();
+            for (int i = 0; i < rowCountInOneBatch; ++i) {
+                rowBatch.add(new RowBatch.Row(fieldVectors.size()));
+            }
+            convertArrowToRowBatch();
+            readRowCount += root.getRowCount();
+            return this;
+        } catch (DorisException e) {
+            logger.error("Read Doris Data failed because: ", e);
+            throw new DorisRuntimeException(e.getMessage());
+        } catch (IOException e) {
+            return this;
+        }
+    }
+
     public RowBatch readArrow() {
         try {
             this.root = arrowStreamReader.getVectorSchemaRoot();
@@ -297,6 +338,7 @@ public class RowBatch {
             case "DECIMAL32":
             case "DECIMAL64":
             case "DECIMAL128I":
+            case "DECIMAL128":
                 if (!minorType.equals(Types.MinorType.DECIMAL)) {
                     return false;
                 }
@@ -320,8 +362,8 @@ public class RowBatch {
                         addValueToRow(rowIndex, null);
                         break;
                     }
-                    String stringValue = new String(date.get(rowIndex));
-                    LocalDate localDate = LocalDate.parse(stringValue, 
dateFormatter);
+                    String stringValue = new String(date.get(rowIndex), 
StandardCharsets.UTF_8);
+                    LocalDate localDate = 
FastDateUtil.fastParseDate(stringValue, DATE_PATTERN);
                     addValueToRow(rowIndex, localDate);
                 } else {
                     DateDayVector date = (DateDayVector) fieldVector;
@@ -340,8 +382,11 @@ public class RowBatch {
                         addValueToRow(rowIndex, null);
                         break;
                     }
-                    String stringValue = new 
String(varCharVector.get(rowIndex));
-                    LocalDateTime parse = LocalDateTime.parse(stringValue, 
dateTimeFormatter);
+                    String stringValue =
+                            new String(varCharVector.get(rowIndex), 
StandardCharsets.UTF_8);
+                    stringValue = completeMilliseconds(stringValue);
+                    LocalDateTime parse =
+                            FastDateUtil.fastParseDateTime(stringValue, 
DATETIME_PATTERN);
                     addValueToRow(rowIndex, parse);
                 } else if (fieldVector instanceof TimeStampVector) {
                     LocalDateTime dateTime = getDateTime(rowIndex, 
fieldVector);
@@ -361,9 +406,11 @@ public class RowBatch {
                         addValueToRow(rowIndex, null);
                         break;
                     }
-                    String stringValue = new 
String(varCharVector.get(rowIndex));
+                    String stringValue =
+                            new String(varCharVector.get(rowIndex), 
StandardCharsets.UTF_8);
                     stringValue = completeMilliseconds(stringValue);
-                    LocalDateTime parse = LocalDateTime.parse(stringValue, 
dateTimeV2Formatter);
+                    LocalDateTime parse =
+                            FastDateUtil.fastParseDateTimeV2(stringValue, 
DATETIMEV2_PATTERN);
                     addValueToRow(rowIndex, parse);
                 } else if (fieldVector instanceof TimeStampVector) {
                     LocalDateTime dateTime = getDateTime(rowIndex, 
fieldVector);
@@ -405,7 +452,8 @@ public class RowBatch {
                         addValueToRow(rowIndex, null);
                         break;
                     }
-                    String stringValue = new 
String(largeIntVector.get(rowIndex));
+                    String stringValue =
+                            new String(largeIntVector.get(rowIndex), 
StandardCharsets.UTF_8);
                     BigInteger largeInt = new BigInteger(stringValue);
                     addValueToRow(rowIndex, largeInt);
                     break;
@@ -423,7 +471,8 @@ public class RowBatch {
                     addValueToRow(rowIndex, null);
                     break;
                 }
-                String stringValue = new String(varCharVector.get(rowIndex));
+                String stringValue =
+                        new String(varCharVector.get(rowIndex), 
StandardCharsets.UTF_8);
                 addValueToRow(rowIndex, stringValue);
                 break;
             case "IPV6":
@@ -435,7 +484,8 @@ public class RowBatch {
                     addValueToRow(rowIndex, null);
                     break;
                 }
-                String ipv6Str = new String(ipv6VarcharVector.get(rowIndex));
+                String ipv6Str =
+                        new String(ipv6VarcharVector.get(rowIndex), 
StandardCharsets.UTF_8);
                 String ipv6Address = IPUtils.fromBigInteger(new 
BigInteger(ipv6Str));
                 addValueToRow(rowIndex, ipv6Address);
                 break;
@@ -526,6 +576,14 @@ public class RowBatch {
         return LocalDateTime.ofInstant(instant, DEFAULT_ZONE_ID);
     }
 
+    /**
+     * use case when to replace while 
"Benchmark","Mode","Threads","Samples","Score","Score Error.
+     * (99.9%)","Unit" "CaseWhenTest", "thrpt", 1, 5, 40657433.897696, 
2515802.067503,"ops/s"
+     * "WhileTest", "thrpt", 1, 5, 9708130.819491, 1207453.635429,"ops/s"
+     *
+     * @param stringValue
+     * @return
+     */
     @VisibleForTesting
     public static String completeMilliseconds(String stringValue) {
         if (stringValue.length() == DATETIMEV2_PATTERN.length()) {
@@ -536,14 +594,26 @@ public class RowBatch {
             return stringValue;
         }
 
-        StringBuilder sb = new StringBuilder(stringValue);
         if (stringValue.length() == DATETIME_PATTERN.length()) {
-            sb.append(".");
+            stringValue += ".";
         }
-        while (sb.toString().length() < DATETIMEV2_PATTERN.length()) {
-            sb.append(0);
+        int s = DATETIMEV2_PATTERN.length() - stringValue.length();
+        switch (s) {
+            case 1:
+                return stringValue + "0";
+            case 2:
+                return stringValue + "00";
+            case 3:
+                return stringValue + "000";
+            case 4:
+                return stringValue + "0000";
+            case 5:
+                return stringValue + "00000";
+            case 6:
+                return stringValue + "000000";
+            default:
+                return stringValue;
         }
-        return sb.toString();
     }
 
     public List<Object> next() {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java
new file mode 100644
index 00000000..a2b9b632
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.source.reader;
+
+import org.apache.arrow.adbc.core.AdbcConnection;
+import org.apache.arrow.adbc.core.AdbcDatabase;
+import org.apache.arrow.adbc.core.AdbcDriver;
+import org.apache.arrow.adbc.core.AdbcException;
+import org.apache.arrow.adbc.core.AdbcStatement;
+import org.apache.arrow.adbc.driver.flightsql.FlightSqlDriver;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.exception.DorisException;
+import org.apache.doris.flink.exception.IllegalArgumentException;
+import org.apache.doris.flink.exception.ShouldNeverHappenException;
+import org.apache.doris.flink.rest.PartitionDefinition;
+import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.rest.SchemaUtils;
+import org.apache.doris.flink.rest.models.Schema;
+import org.apache.doris.flink.serialization.RowBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static 
org.apache.doris.flink.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE;
+
+public class DorisFlightValueReader extends ValueReader implements 
AutoCloseable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DorisFlightValueReader.class);
+    protected AdbcConnection client;
+    protected Lock clientLock = new ReentrantLock();
+
+    private final PartitionDefinition partition;
+    private final DorisOptions options;
+    private final DorisReadOptions readOptions;
+    private AdbcStatement statement;
+    protected RowBatch rowBatch;
+    protected Schema schema;
+    AdbcStatement.QueryResult queryResult;
+    protected ArrowReader arrowReader;
+    protected AtomicBoolean eos = new AtomicBoolean(false);
+
+    public DorisFlightValueReader(
+            PartitionDefinition partition,
+            DorisOptions options,
+            DorisReadOptions readOptions,
+            Schema schema) {
+        this.partition = partition;
+        this.options = options;
+        this.readOptions = readOptions;
+        this.client = openConnection();
+        this.schema = schema;
+        init();
+    }
+
+    private void init() {
+        clientLock.lock();
+        try {
+            this.statement = this.client.createStatement();
+            this.statement.setSqlQuery(
+                    RestService.parseFlightSql(readOptions, options, 
partition, LOG));
+            this.queryResult = statement.executeQuery();
+            this.arrowReader = queryResult.getReader();
+        } catch (AdbcException | DorisException e) {
+            throw new RuntimeException(e);
+        } finally {
+            clientLock.unlock();
+        }
+        LOG.debug("Open scan result is, schema: {}.", schema);
+    }
+
+    private AdbcConnection openConnection() {
+        final Map<String, Object> parameters = new HashMap<>();
+        RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
+        FlightSqlDriver driver = new FlightSqlDriver(allocator);
+        String[] split = null;
+        try {
+            split = RestService.randomEndpoint(options.getFenodes(), 
LOG).split(":");
+        } catch (IllegalArgumentException e) {
+            throw new RuntimeException("Get FENode Error", e);
+        }
+        AdbcDriver.PARAM_URI.set(
+                parameters,
+                Location.forGrpcInsecure(String.valueOf(split[0]), 
readOptions.getFlightSqlPort())
+                        .getUri()
+                        .toString());
+        AdbcDriver.PARAM_USERNAME.set(parameters, options.getUsername());
+        AdbcDriver.PARAM_PASSWORD.set(parameters, options.getPassword());
+        try {
+            AdbcDatabase adbcDatabase = driver.open(parameters);
+            return adbcDatabase.connect();
+        } catch (AdbcException e) {
+            LOG.debug("Open Flight Connection error: {}", e.getDetails());
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * read data and cached in rowBatch.
+     *
+     * @return true if hax next value
+     */
+    public boolean hasNext() {
+        boolean hasNext = false;
+        clientLock.lock();
+        try {
+            // Arrow data was acquired synchronously during the iterative 
process
+            if (!eos.get() && (rowBatch == null || !rowBatch.hasNext())) {
+                if (!eos.get()) {
+                    eos.set(!arrowReader.loadNextBatch());
+                    rowBatch =
+                            new RowBatch(
+                                            arrowReader,
+                                            SchemaUtils.convertToSchema(
+                                                    this.schema,
+                                                    
arrowReader.getVectorSchemaRoot().getSchema()))
+                                    .readFlightArrow();
+                }
+            }
+            hasNext = !eos.get();
+            return hasNext;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        } finally {
+            clientLock.unlock();
+        }
+    }
+
+    /**
+     * get next value.
+     *
+     * @return next value
+     */
+    public List next() {
+        if (!hasNext()) {
+            LOG.error(SHOULD_NOT_HAPPEN_MESSAGE);
+            throw new ShouldNeverHappenException();
+        }
+        return rowBatch.next();
+    }
+
+    @Override
+    public void close() throws Exception {
+        clientLock.lock();
+        try {
+            if (rowBatch != null) {
+                rowBatch.close();
+            }
+            if (statement != null) {
+                statement.close();
+            }
+            if (client != null) {
+                client.close();
+            }
+        } finally {
+            clientLock.unlock();
+        }
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
index 01777d43..c9ed6f9c 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.exception.DorisException;
 import org.apache.doris.flink.source.split.DorisSourceSplit;
 import org.apache.doris.flink.source.split.DorisSplitRecords;
 import org.slf4j.Logger;
@@ -41,7 +42,7 @@ public class DorisSourceSplitReader implements 
SplitReader<List, DorisSourceSpli
     private final Queue<DorisSourceSplit> splits;
     private final DorisOptions options;
     private final DorisReadOptions readOptions;
-    private DorisValueReader valueReader;
+    private ValueReader valueReader;
     private String currentSplitId;
 
     public DorisSourceSplitReader(DorisOptions options, DorisReadOptions 
readOptions) {
@@ -52,7 +53,11 @@ public class DorisSourceSplitReader implements 
SplitReader<List, DorisSourceSpli
 
     @Override
     public RecordsWithSplitIds<List> fetch() throws IOException {
-        checkSplitOrStartNext();
+        try {
+            checkSplitOrStartNext();
+        } catch (DorisException e) {
+            throw new RuntimeException(e);
+        }
 
         if (!valueReader.hasNext()) {
             return finishSplit();
@@ -60,7 +65,7 @@ public class DorisSourceSplitReader implements 
SplitReader<List, DorisSourceSpli
         return DorisSplitRecords.forRecords(currentSplitId, valueReader);
     }
 
-    private void checkSplitOrStartNext() throws IOException {
+    private void checkSplitOrStartNext() throws IOException, DorisException {
         if (valueReader != null) {
             return;
         }
@@ -70,7 +75,8 @@ public class DorisSourceSplitReader implements 
SplitReader<List, DorisSourceSpli
         }
         currentSplitId = nextSplit.splitId();
         valueReader =
-                new DorisValueReader(nextSplit.getPartitionDefinition(), 
options, readOptions);
+                ValueReader.createReader(
+                        nextSplit.getPartitionDefinition(), options, 
readOptions, LOG);
     }
 
     private DorisSplitRecords finishSplit() {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
index 098a7707..35639e8a 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
@@ -52,7 +52,7 @@ import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_EXEC_MEM_LIM
 import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT;
 import static 
org.apache.doris.flink.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE;
 
-public class DorisValueReader implements AutoCloseable {
+public class DorisValueReader extends ValueReader implements AutoCloseable {
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisValueReader.class);
     protected BackendClient client;
     protected Lock clientLock = new ReentrantLock();
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java
new file mode 100644
index 00000000..9e453934
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.source.reader;
+
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.exception.DorisException;
+import org.apache.doris.flink.rest.PartitionDefinition;
+import org.apache.doris.flink.rest.RestService;
+import org.slf4j.Logger;
+
+import java.util.List;
+
+public abstract class ValueReader {
+
+    public static ValueReader createReader(
+            PartitionDefinition partition,
+            DorisOptions options,
+            DorisReadOptions readOptions,
+            Logger logger)
+            throws DorisException {
+        logger.info("create reader for partition: {}", partition);
+        if (readOptions.getUseFlightSql()) {
+            return new DorisFlightValueReader(
+                    partition,
+                    options,
+                    readOptions,
+                    RestService.getSchema(options, readOptions, logger));
+        } else {
+            return new DorisValueReader(partition, options, readOptions);
+        }
+    }
+
+    public abstract boolean hasNext();
+
+    public abstract List next();
+
+    public abstract void close() throws Exception;
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java
index cef96762..24d10569 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java
@@ -20,6 +20,7 @@ package org.apache.doris.flink.source.split;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 
 import org.apache.doris.flink.source.reader.DorisValueReader;
+import org.apache.doris.flink.source.reader.ValueReader;
 
 import javax.annotation.Nullable;
 
@@ -34,18 +35,17 @@ import java.util.Set;
 public class DorisSplitRecords implements RecordsWithSplitIds<List> {
 
     private final Set<String> finishedSplits;
-    private final DorisValueReader valueReader;
+    private final ValueReader valueReader;
     private String splitId;
 
-    public DorisSplitRecords(
-            String splitId, DorisValueReader valueReader, Set<String> 
finishedSplits) {
+    public DorisSplitRecords(String splitId, ValueReader valueReader, 
Set<String> finishedSplits) {
         this.splitId = splitId;
         this.valueReader = valueReader;
         this.finishedSplits = finishedSplits;
     }
 
     public static DorisSplitRecords forRecords(
-            final String splitId, final DorisValueReader valueReader) {
+            final String splitId, final ValueReader valueReader) {
         return new DorisSplitRecords(splitId, valueReader, 
Collections.emptySet());
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index 4b0b56c4..02e59084 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -307,7 +307,16 @@ public class DorisConfigOptions {
                     .booleanType()
                     .defaultValue(false)
                     .withDescription("Whether to use buffer cache for 
breakpoint resume");
-
+    public static final ConfigOption<Boolean> USE_FLIGHT_SQL =
+            ConfigOptions.key("source.use-flight-sql")
+                    .booleanType()
+                    .defaultValue(Boolean.FALSE)
+                    .withDescription("use flight sql flag");
+    public static final ConfigOption<Integer> FLIGHT_SQL_PORT =
+            ConfigOptions.key("source.flight-sql-port")
+                    .intType()
+                    .defaultValue(9040)
+                    .withDescription("flight sql port");
     // Prefix for Doris StreamLoad specific properties.
     public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index e8ac4dbf..2559e1f0 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -52,6 +52,7 @@ import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_READ
 import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_RETRIES;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_TABLET_SIZE;
 import static org.apache.doris.flink.table.DorisConfigOptions.FENODES;
+import static org.apache.doris.flink.table.DorisConfigOptions.FLIGHT_SQL_PORT;
 import static org.apache.doris.flink.table.DorisConfigOptions.IDENTIFIER;
 import static org.apache.doris.flink.table.DorisConfigOptions.JDBC_URL;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_CACHE_MAX_ROWS;
@@ -83,6 +84,7 @@ import static 
org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API
 import static 
org.apache.doris.flink.table.DorisConfigOptions.STREAM_LOAD_PROP_PREFIX;
 import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER;
 import static org.apache.doris.flink.table.DorisConfigOptions.USERNAME;
+import static org.apache.doris.flink.table.DorisConfigOptions.USE_FLIGHT_SQL;
 
 /**
  * The {@link DorisDynamicTableFactory} translates the catalog table to a 
table source.
@@ -157,6 +159,9 @@ public final class DorisDynamicTableFactory
         options.add(SOURCE_USE_OLD_API);
         options.add(SINK_WRITE_MODE);
         options.add(SINK_IGNORE_COMMIT_ERROR);
+
+        options.add(USE_FLIGHT_SQL);
+        options.add(FLIGHT_SQL_PORT);
         return options;
     }
 
@@ -216,7 +221,9 @@ public final class DorisDynamicTableFactory
                         (int) 
readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS).toMillis())
                 .setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES))
                 .setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE))
-                .setUseOldApi(readableConfig.get(SOURCE_USE_OLD_API));
+                .setUseOldApi(readableConfig.get(SOURCE_USE_OLD_API))
+                .setUseFlightSql(readableConfig.get(USE_FLIGHT_SQL))
+                .setFlightSqlPort(readableConfig.get(FLIGHT_SQL_PORT));
         return builder.build();
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/util/FastDateUtil.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/util/FastDateUtil.java
new file mode 100644
index 00000000..3c24b810
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/util/FastDateUtil.java
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.util;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+
+/**
+ * idea for this util is from https://bugs.openjdk.org/browse/JDK-8144808 
991ms.
+ * LocalDateTime.parse(...) 246ms : LocalDateTime.of(...)
+ */
+public final class FastDateUtil {
+
+    public static LocalDateTime fastParseDateTimeV2(String dateTime, String 
pattern) {
+        char[] arr = dateTime.toCharArray();
+        int[] indexes =
+                new int[] {
+                    pattern.indexOf("yyyy"),
+                    pattern.indexOf("MM"),
+                    pattern.indexOf("dd"),
+                    pattern.indexOf("HH"),
+                    pattern.indexOf("mm"),
+                    pattern.indexOf("ss"),
+                    pattern.indexOf("SSSSSS")
+                };
+        int year = parseFromIndex(arr, indexes[0], indexes[0] + 4);
+        int month = parseFromIndex(arr, indexes[1], indexes[1] + 2);
+        int day = parseFromIndex(arr, indexes[2], indexes[2] + 2);
+        int hour = parseFromIndex(arr, indexes[3], indexes[3] + 2);
+        int minute = parseFromIndex(arr, indexes[4], indexes[4] + 2);
+        int second = parseFromIndex(arr, indexes[5], indexes[5] + 2);
+        int nanos = parseFromIndex(arr, indexes[6], indexes[6] + 6) * 1000;
+        return LocalDateTime.of(year, month, day, hour, minute, second, nanos);
+    }
+
+    public static LocalDateTime fastParseDateTime(String dateTime, String 
pattern) {
+        char[] arr = dateTime.toCharArray();
+        int[] indexes =
+                new int[] {
+                    pattern.indexOf("yyyy"),
+                    pattern.indexOf("MM"),
+                    pattern.indexOf("dd"),
+                    pattern.indexOf("HH"),
+                    pattern.indexOf("mm"),
+                    pattern.indexOf("ss")
+                };
+        int year = parseFromIndex(arr, indexes[0], indexes[0] + 4);
+        int month = parseFromIndex(arr, indexes[1], indexes[1] + 2);
+        int day = parseFromIndex(arr, indexes[2], indexes[2] + 2);
+        int hour = parseFromIndex(arr, indexes[3], indexes[3] + 2);
+        int minute = parseFromIndex(arr, indexes[4], indexes[4] + 2);
+        int second = parseFromIndex(arr, indexes[5], indexes[5] + 2);
+        return LocalDateTime.of(year, month, day, hour, minute, second);
+    }
+
+    public static LocalDate fastParseDate(String dateTime, String pattern) {
+        char[] arr = dateTime.toCharArray();
+        int[] indexes =
+                new int[] {
+                    pattern.indexOf("yyyy"), pattern.indexOf("MM"), 
pattern.indexOf("dd"),
+                };
+        int year = parseFromIndex(arr, indexes[0], indexes[0] + 4);
+        int month = parseFromIndex(arr, indexes[1], indexes[1] + 2);
+        int day = parseFromIndex(arr, indexes[2], indexes[2] + 2);
+        return LocalDate.of(year, month, day);
+    }
+
+    private static int parseFromIndex(char[] arr, int start, int end) {
+        int value = 0;
+        for (int i = start; i < end; i++) {
+            value = value * 10 + (arr[i] - '0');
+        }
+        return value;
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/rest/SchemaUtilsTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/rest/SchemaUtilsTest.java
new file mode 100644
index 00000000..2353bd7d
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/rest/SchemaUtilsTest.java
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.rest;
+
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.doris.flink.exception.DorisException;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+public class SchemaUtilsTest {
+    private static final Logger logger = 
LoggerFactory.getLogger(SchemaUtilsTest.class);
+
+    @Test
+    public void convertToSchema() throws DorisException {
+        Field field1 =
+                new Field("field1", FieldType.notNullable(new 
ArrowType.Int(32, true)), null);
+        Field field2 =
+                new Field("field2", FieldType.notNullable(new 
ArrowType.Int(32, true)), null);
+        Schema arrowSchema = new Schema(Arrays.asList(field1, field2));
+        String schemaStr =
+                "{\"properties\":["
+                        + 
"{\"type\":\"int\",\"name\":\"field1\",\"comment\":\"\"}"
+                        + 
",{\"type\":\"int\",\"name\":\"field2\",\"comment\":\"\"}"
+                        + "], \"status\":200}";
+        org.apache.doris.flink.rest.models.Schema schema =
+                RestService.parseSchema(schemaStr, logger);
+
+        org.apache.doris.flink.rest.models.Schema result =
+                SchemaUtils.convertToSchema(schema, arrowSchema);
+
+        assertEquals(2, result.getProperties().size());
+        assertEquals("field1", result.getProperties().get(0).getName());
+        assertEquals("field2", result.getProperties().get(1).getName());
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
index 0004af05..05a93dc5 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
@@ -65,7 +65,8 @@ public class DorisDynamicTableFactoryTest {
         properties.put("lookup.jdbc.read.batch.size", "16");
         properties.put("lookup.jdbc.read.batch.queue-size", "16");
         properties.put("lookup.jdbc.read.thread-size", "1");
-
+        properties.put("source.use-flight-sql", "false");
+        properties.put("source.flight-sql-port", "9040");
         DynamicTableSource actual = createTableSource(SCHEMA, properties);
         DorisOptions options =
                 DorisOptions.builder()
@@ -98,7 +99,9 @@ public class DorisDynamicTableFactoryTest {
                 
.setRequestConnectTimeoutMs(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
                 .setRequestReadTimeoutMs(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
                 .setRequestRetries(DORIS_REQUEST_RETRIES_DEFAULT)
-                .setRequestTabletSize(DORIS_TABLET_SIZE_DEFAULT);
+                .setRequestTabletSize(DORIS_TABLET_SIZE_DEFAULT)
+                .setUseFlightSql(false)
+                .setFlightSqlPort(9040);
         DorisDynamicTableSource expected =
                 new DorisDynamicTableSource(
                         options,
@@ -182,7 +185,9 @@ public class DorisDynamicTableFactoryTest {
                 
.setRequestConnectTimeoutMs(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
                 .setRequestReadTimeoutMs(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
                 .setRequestRetries(DORIS_REQUEST_RETRIES_DEFAULT)
-                .setRequestTabletSize(DORIS_TABLET_SIZE_DEFAULT);
+                .setRequestTabletSize(DORIS_TABLET_SIZE_DEFAULT)
+                .setUseFlightSql(false)
+                .setFlightSqlPort(9040);
         DorisDynamicTableSink expected =
                 new DorisDynamicTableSink(
                         options,
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/FastDateUtilTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/FastDateUtilTest.java
new file mode 100644
index 00000000..a89e50a0
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/FastDateUtilTest.java
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.utils;
+
+import org.apache.doris.flink.util.FastDateUtil;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class FastDateUtilTest {
+
+    @Test
+    void 
fastParseDateTimeV2_withValidDateTimeAndPattern_returnsCorrectLocalDateTime() {
+        String dateTime = "2023-10-05 14:30:45.123456";
+        String pattern = "yyyy-MM-dd HH:mm:ss.SSSSSS";
+        LocalDateTime result = FastDateUtil.fastParseDateTimeV2(dateTime, 
pattern);
+        assertEquals(LocalDateTime.of(2023, 10, 5, 14, 30, 45, 123456000), 
result);
+    }
+
+    @Test
+    void 
fastParseDateTime_withValidDateTimeAndPattern_returnsCorrectLocalDateTime() {
+        String dateTime = "2023-10-05 14:30:45";
+        String pattern = "yyyy-MM-dd HH:mm:ss";
+        LocalDateTime result = FastDateUtil.fastParseDateTime(dateTime, 
pattern);
+        assertEquals(LocalDateTime.of(2023, 10, 5, 14, 30, 45), result);
+    }
+
+    @Test
+    void fastParseDate_withValidDateAndPattern_returnsCorrectLocalDate() {
+        String dateTime = "2023-10-05";
+        String pattern = "yyyy-MM-dd";
+        LocalDate result = FastDateUtil.fastParseDate(dateTime, pattern);
+        assertEquals(LocalDate.of(2023, 10, 5), result);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to