This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new eb90905b [Feature] support reader doris using arrow flight driver
(#465)
eb90905b is described below
commit eb90905b82a6e46aeb8b9f0ddea151bcd64554c7
Author: xiayang <[email protected]>
AuthorDate: Mon Aug 12 15:53:33 2024 +0800
[Feature] support reader doris using arrow flight driver (#465)
---
flink-doris-connector/pom.xml | 21 ++-
.../doris/flink/cfg/ConfigurationOptions.java | 6 +
.../apache/doris/flink/cfg/DorisReadOptions.java | 42 ++++-
.../apache/doris/flink/cfg/DorisStreamOptions.java | 12 ++
.../org/apache/doris/flink/rest/RestService.java | 32 ++++
.../org/apache/doris/flink/rest/SchemaUtils.java | 24 +++
.../apache/doris/flink/serialization/RowBatch.java | 102 ++++++++++--
.../source/reader/DorisFlightValueReader.java | 182 +++++++++++++++++++++
.../source/reader/DorisSourceSplitReader.java | 14 +-
.../flink/source/reader/DorisValueReader.java | 2 +-
.../doris/flink/source/reader/ValueReader.java | 54 ++++++
.../flink/source/split/DorisSplitRecords.java | 8 +-
.../doris/flink/table/DorisConfigOptions.java | 11 +-
.../flink/table/DorisDynamicTableFactory.java | 9 +-
.../org/apache/doris/flink/util/FastDateUtil.java | 90 ++++++++++
.../apache/doris/flink/rest/SchemaUtilsTest.java | 58 +++++++
.../flink/table/DorisDynamicTableFactoryTest.java | 11 +-
.../apache/doris/flink/utils/FastDateUtilTest.java | 53 ++++++
18 files changed, 687 insertions(+), 44 deletions(-)
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index 2d7b2875..4bf34d68 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -73,7 +73,6 @@ under the License.
<flink.sql.cdc.version>3.1.1</flink.sql.cdc.version>
<flink.python.id>flink-python</flink.python.id>
<libthrift.version>0.16.0</libthrift.version>
- <arrow.version>13.0.0</arrow.version>
<maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version>
<maven-javadoc-plugin.version>3.3.0</maven-javadoc-plugin.version>
<maven-source-plugin.version>3.2.1</maven-source-plugin.version>
@@ -95,6 +94,8 @@ under the License.
<jsqlparser.version>4.9</jsqlparser.version>
<mysql.driver.version>8.0.26</mysql.driver.version>
<ojdbc.version>19.3.0.0</ojdbc.version>
+ <arrow.version>15.0.2</arrow.version>
+ <adbc.version>0.12.0</adbc.version>
</properties>
<dependencies>
@@ -179,13 +180,16 @@ under the License.
<artifactId>commons-codec</artifactId>
<version>${commons-codec.version}</version>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.arrow.adbc</groupId>
+ <artifactId>adbc-driver-flight-sql</artifactId>
+ <version>${adbc.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${arrow.version}</version>
</dependency>
-
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
@@ -206,7 +210,6 @@ under the License.
</exclusion>
</exclusions>
</dependency>
-
<!-- jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
@@ -410,13 +413,13 @@ under the License.
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
- <version>3.2.4</version>
+ <version>3.4.1</version>
<configuration>
<relocations>
- <relocation>
- <pattern>org.apache.arrow</pattern>
-
<shadedPattern>org.apache.doris.shaded.org.apache.arrow</shadedPattern>
- </relocation>
+<!-- <relocation>-->
+<!-- <pattern>org.apache.arrow</pattern>-->
+<!--
<shadedPattern>org.apache.doris.shaded.org.apache.arrow</shadedPattern>-->
+<!-- </relocation>-->
<relocation>
<pattern>io.netty</pattern>
<shadedPattern>org.apache.doris.shaded.io.netty</shadedPattern>
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
index 4a3f70b8..c249c251 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
@@ -51,4 +51,10 @@ public interface ConfigurationOptions {
Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false;
String DORIS_DESERIALIZE_QUEUE_SIZE = "doris.deserialize.queue.size";
Integer DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64;
+
+ String USE_FLIGHT_SQL = "source.use-flight-sql";
+ Boolean USE_FLIGHT_SQL_DEFAULT = false;
+
+ String FLIGHT_SQL_PORT = "source.flight-sql-port";
+ Integer FLIGHT_SQL_PORT_DEFAULT = 9040;
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
index 3669e740..2f6cd8a8 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
@@ -37,6 +37,8 @@ public class DorisReadOptions implements Serializable {
private Integer deserializeQueueSize;
private Boolean deserializeArrowAsync;
private boolean useOldApi;
+ private boolean useFlightSql;
+ private Integer flightSqlPort;
public DorisReadOptions(
String readFields,
@@ -50,7 +52,9 @@ public class DorisReadOptions implements Serializable {
Long execMemLimit,
Integer deserializeQueueSize,
Boolean deserializeArrowAsync,
- boolean useOldApi) {
+ boolean useOldApi,
+ boolean useFlightSql,
+ Integer flightSqlPort) {
this.readFields = readFields;
this.filterQuery = filterQuery;
this.requestTabletSize = requestTabletSize;
@@ -63,6 +67,8 @@ public class DorisReadOptions implements Serializable {
this.deserializeQueueSize = deserializeQueueSize;
this.deserializeArrowAsync = deserializeArrowAsync;
this.useOldApi = useOldApi;
+ this.useFlightSql = useFlightSql;
+ this.flightSqlPort = flightSqlPort;
}
public String getReadFields() {
@@ -121,6 +127,14 @@ public class DorisReadOptions implements Serializable {
this.filterQuery = filterQuery;
}
+ public boolean getUseFlightSql() {
+ return useFlightSql;
+ }
+
+ public Integer getFlightSqlPort() {
+ return flightSqlPort;
+ }
+
public static Builder builder() {
return new Builder();
}
@@ -149,7 +163,9 @@ public class DorisReadOptions implements Serializable {
&& Objects.equals(requestBatchSize, that.requestBatchSize)
&& Objects.equals(execMemLimit, that.execMemLimit)
&& Objects.equals(deserializeQueueSize,
that.deserializeQueueSize)
- && Objects.equals(deserializeArrowAsync,
that.deserializeArrowAsync);
+ && Objects.equals(deserializeArrowAsync,
that.deserializeArrowAsync)
+ && Objects.equals(useFlightSql, that.useFlightSql)
+ && Objects.equals(flightSqlPort, that.flightSqlPort);
}
@Override
@@ -166,7 +182,9 @@ public class DorisReadOptions implements Serializable {
execMemLimit,
deserializeQueueSize,
deserializeArrowAsync,
- useOldApi);
+ useOldApi,
+ useFlightSql,
+ flightSqlPort);
}
/** Builder of {@link DorisReadOptions}. */
@@ -184,6 +202,8 @@ public class DorisReadOptions implements Serializable {
private Integer deserializeQueueSize;
private Boolean deserializeArrowAsync;
private Boolean useOldApi = false;
+ private Boolean useFlightSql = false;
+ private Integer flightSqlPort;
public Builder setReadFields(String readFields) {
this.readFields = readFields;
@@ -240,11 +260,21 @@ public class DorisReadOptions implements Serializable {
return this;
}
- public Builder setUseOldApi(boolean useOldApi) {
+ public Builder setUseFlightSql(Boolean useFlightSql) {
+ this.useFlightSql = useFlightSql;
+ return this;
+ }
+
+ public Builder setUseOldApi(Boolean useOldApi) {
this.useOldApi = useOldApi;
return this;
}
+ public Builder setFlightSqlPort(Integer flightSqlPort) {
+ this.flightSqlPort = flightSqlPort;
+ return this;
+ }
+
public DorisReadOptions build() {
return new DorisReadOptions(
readFields,
@@ -258,7 +288,9 @@ public class DorisReadOptions implements Serializable {
execMemLimit,
deserializeQueueSize,
deserializeArrowAsync,
- useOldApi);
+ useOldApi,
+ useFlightSql,
+ flightSqlPort);
}
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java
index 89e9182e..e6d75969 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java
@@ -106,6 +106,18 @@ public class DorisStreamOptions implements Serializable {
prop.getProperty(
ConfigurationOptions.DORIS_TABLET_SIZE,
ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT
+ .toString())))
+ .setUseFlightSql(
+ Boolean.valueOf(
+ prop.getProperty(
+
ConfigurationOptions.USE_FLIGHT_SQL,
+
ConfigurationOptions.USE_FLIGHT_SQL_DEFAULT
+ .toString())))
+ .setFlightSqlPort(
+ Integer.valueOf(
+ prop.getProperty(
+
ConfigurationOptions.FLIGHT_SQL_PORT,
+
ConfigurationOptions.FLIGHT_SQL_PORT_DEFAULT
.toString())));
this.options = optionsBuilder.build();
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index 1dbb1fde..1663d4b3 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -234,6 +234,38 @@ public class RestService implements Serializable {
}
}
+ @VisibleForTesting
+ public static String parseFlightSql(
+ DorisReadOptions readOptions,
+ DorisOptions options,
+ PartitionDefinition partition,
+ Logger logger)
+ throws IllegalArgumentException {
+ String[] tableIdentifiers =
parseIdentifier(options.getTableIdentifier(), logger);
+ String readFields =
+ StringUtils.isBlank(readOptions.getReadFields())
+ ? "*"
+ : readOptions.getReadFields();
+ String sql =
+ "select "
+ + readFields
+ + " from `"
+ + tableIdentifiers[0]
+ + "`.`"
+ + tableIdentifiers[1]
+ + "`";
+ String tablet =
+ partition.getTabletIds().stream()
+ .map(Object::toString)
+ .collect(Collectors.joining(","));
+ sql += " TABLET(" + tablet + ") ";
+ if (!StringUtils.isEmpty(readOptions.getFilterQuery())) {
+ sql += " where " + readOptions.getFilterQuery();
+ }
+ logger.info("Query SQL Sending to Doris FE is: '{}'.", sql);
+ return sql;
+ }
+
/**
* parse table identifier to array.
*
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java
index f6594b5b..9f7d7fc6 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java
@@ -22,6 +22,9 @@ import org.apache.doris.flink.rest.models.Schema;
import org.apache.doris.sdk.thrift.TScanColumnDesc;
import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
public class SchemaUtils {
@@ -46,4 +49,25 @@ public class SchemaUtils {
"")));
return schema;
}
+
+ public static Schema convertToSchema(
+ Schema tableSchema, org.apache.arrow.vector.types.pojo.Schema
tscanColumnDescs) {
+ Schema schema = new Schema(tscanColumnDescs.getFields().size());
+ Map<String, Field> collect =
+ tableSchema.getProperties().stream()
+ .collect(Collectors.toMap(Field::getName,
Function.identity()));
+ tscanColumnDescs
+ .getFields()
+ .forEach(
+ desc ->
+ schema.put(
+ new Field(
+ desc.getName(),
+
collect.get(desc.getName()).getType(),
+ "",
+ 0,
+ 0,
+ "")));
+ return schema;
+ }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index 38c63b77..dee9c1fc 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -44,11 +44,13 @@ import
org.apache.arrow.vector.complex.impl.DateDayReaderImpl;
import org.apache.arrow.vector.complex.impl.TimeStampMicroReaderImpl;
import org.apache.arrow.vector.complex.impl.UnionMapReader;
import org.apache.arrow.vector.complex.reader.FieldReader;
+import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.types.Types;
import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.rest.models.Schema;
+import org.apache.doris.flink.util.FastDateUtil;
import org.apache.doris.flink.util.IPUtils;
import org.apache.doris.sdk.thrift.TScanBatchResult;
import org.slf4j.Logger;
@@ -58,6 +60,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
@@ -96,18 +99,19 @@ public class RowBatch {
private int rowCountInOneBatch = 0;
private int readRowCount = 0;
private final List<Row> rowBatch = new ArrayList<>();
- private final ArrowStreamReader arrowStreamReader;
+ private final ArrowReader arrowStreamReader;
private VectorSchemaRoot root;
private List<FieldVector> fieldVectors;
private RootAllocator rootAllocator;
private final Schema schema;
private static final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
private static final String DATETIMEV2_PATTERN = "yyyy-MM-dd
HH:mm:ss.SSSSSS";
+ private static final String DATE_PATTERN = "yyyy-MM-dd";
private final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern(DATETIME_PATTERN);
private final DateTimeFormatter dateTimeV2Formatter =
DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN);
- private final DateTimeFormatter dateFormatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd");
+ private final DateTimeFormatter dateFormatter =
DateTimeFormatter.ofPattern(DATE_PATTERN);
private static final ZoneId DEFAULT_ZONE_ID = ZoneId.systemDefault();
public List<Row> getRowBatch() {
@@ -123,6 +127,43 @@ public class RowBatch {
this.offsetInRowBatch = 0;
}
+ public RowBatch(ArrowReader nextResult, Schema schema) {
+ this.schema = schema;
+ this.arrowStreamReader = nextResult;
+ this.offsetInRowBatch = 0;
+ }
+
+ public RowBatch readFlightArrow() {
+ try {
+ this.root = arrowStreamReader.getVectorSchemaRoot();
+ fieldVectors = root.getFieldVectors();
+ if (fieldVectors.size() > schema.size()) {
+ logger.error(
+ "Schema size '{}' is not equal to arrow field size
'{}'.",
+ fieldVectors.size(),
+ schema.size());
+ throw new DorisException(
+ "Load Doris data failed, schema size of fetch data is
wrong.");
+ }
+ if (fieldVectors.isEmpty() || root.getRowCount() == 0) {
+ logger.debug("One batch in arrow has no data.");
+ return null;
+ }
+ rowCountInOneBatch = root.getRowCount();
+ for (int i = 0; i < rowCountInOneBatch; ++i) {
+ rowBatch.add(new RowBatch.Row(fieldVectors.size()));
+ }
+ convertArrowToRowBatch();
+ readRowCount += root.getRowCount();
+ return this;
+ } catch (DorisException e) {
+ logger.error("Read Doris Data failed because: ", e);
+ throw new DorisRuntimeException(e.getMessage());
+ } catch (IOException e) {
+ return this;
+ }
+ }
+
public RowBatch readArrow() {
try {
this.root = arrowStreamReader.getVectorSchemaRoot();
@@ -297,6 +338,7 @@ public class RowBatch {
case "DECIMAL32":
case "DECIMAL64":
case "DECIMAL128I":
+ case "DECIMAL128":
if (!minorType.equals(Types.MinorType.DECIMAL)) {
return false;
}
@@ -320,8 +362,8 @@ public class RowBatch {
addValueToRow(rowIndex, null);
break;
}
- String stringValue = new String(date.get(rowIndex));
- LocalDate localDate = LocalDate.parse(stringValue,
dateFormatter);
+ String stringValue = new String(date.get(rowIndex),
StandardCharsets.UTF_8);
+ LocalDate localDate =
FastDateUtil.fastParseDate(stringValue, DATE_PATTERN);
addValueToRow(rowIndex, localDate);
} else {
DateDayVector date = (DateDayVector) fieldVector;
@@ -340,8 +382,11 @@ public class RowBatch {
addValueToRow(rowIndex, null);
break;
}
- String stringValue = new
String(varCharVector.get(rowIndex));
- LocalDateTime parse = LocalDateTime.parse(stringValue,
dateTimeFormatter);
+ String stringValue =
+ new String(varCharVector.get(rowIndex),
StandardCharsets.UTF_8);
+ stringValue = completeMilliseconds(stringValue);
+ LocalDateTime parse =
+ FastDateUtil.fastParseDateTime(stringValue,
DATETIME_PATTERN);
addValueToRow(rowIndex, parse);
} else if (fieldVector instanceof TimeStampVector) {
LocalDateTime dateTime = getDateTime(rowIndex,
fieldVector);
@@ -361,9 +406,11 @@ public class RowBatch {
addValueToRow(rowIndex, null);
break;
}
- String stringValue = new
String(varCharVector.get(rowIndex));
+ String stringValue =
+ new String(varCharVector.get(rowIndex),
StandardCharsets.UTF_8);
stringValue = completeMilliseconds(stringValue);
- LocalDateTime parse = LocalDateTime.parse(stringValue,
dateTimeV2Formatter);
+ LocalDateTime parse =
+ FastDateUtil.fastParseDateTimeV2(stringValue,
DATETIMEV2_PATTERN);
addValueToRow(rowIndex, parse);
} else if (fieldVector instanceof TimeStampVector) {
LocalDateTime dateTime = getDateTime(rowIndex,
fieldVector);
@@ -405,7 +452,8 @@ public class RowBatch {
addValueToRow(rowIndex, null);
break;
}
- String stringValue = new
String(largeIntVector.get(rowIndex));
+ String stringValue =
+ new String(largeIntVector.get(rowIndex),
StandardCharsets.UTF_8);
BigInteger largeInt = new BigInteger(stringValue);
addValueToRow(rowIndex, largeInt);
break;
@@ -423,7 +471,8 @@ public class RowBatch {
addValueToRow(rowIndex, null);
break;
}
- String stringValue = new String(varCharVector.get(rowIndex));
+ String stringValue =
+ new String(varCharVector.get(rowIndex),
StandardCharsets.UTF_8);
addValueToRow(rowIndex, stringValue);
break;
case "IPV6":
@@ -435,7 +484,8 @@ public class RowBatch {
addValueToRow(rowIndex, null);
break;
}
- String ipv6Str = new String(ipv6VarcharVector.get(rowIndex));
+ String ipv6Str =
+ new String(ipv6VarcharVector.get(rowIndex),
StandardCharsets.UTF_8);
String ipv6Address = IPUtils.fromBigInteger(new
BigInteger(ipv6Str));
addValueToRow(rowIndex, ipv6Address);
break;
@@ -526,6 +576,14 @@ public class RowBatch {
return LocalDateTime.ofInstant(instant, DEFAULT_ZONE_ID);
}
+ /**
+ * use case when to replace while
"Benchmark","Mode","Threads","Samples","Score","Score Error.
+ * (99.9%)","Unit" "CaseWhenTest", "thrpt", 1, 5, 40657433.897696,
2515802.067503,"ops/s"
+ * "WhileTest", "thrpt", 1, 5, 9708130.819491, 1207453.635429,"ops/s"
+ *
+ * @param stringValue
+ * @return
+ */
@VisibleForTesting
public static String completeMilliseconds(String stringValue) {
if (stringValue.length() == DATETIMEV2_PATTERN.length()) {
@@ -536,14 +594,26 @@ public class RowBatch {
return stringValue;
}
- StringBuilder sb = new StringBuilder(stringValue);
if (stringValue.length() == DATETIME_PATTERN.length()) {
- sb.append(".");
+ stringValue += ".";
}
- while (sb.toString().length() < DATETIMEV2_PATTERN.length()) {
- sb.append(0);
+ int s = DATETIMEV2_PATTERN.length() - stringValue.length();
+ switch (s) {
+ case 1:
+ return stringValue + "0";
+ case 2:
+ return stringValue + "00";
+ case 3:
+ return stringValue + "000";
+ case 4:
+ return stringValue + "0000";
+ case 5:
+ return stringValue + "00000";
+ case 6:
+ return stringValue + "000000";
+ default:
+ return stringValue;
}
- return sb.toString();
}
public List<Object> next() {
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java
new file mode 100644
index 00000000..a2b9b632
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.source.reader;
+
+import org.apache.arrow.adbc.core.AdbcConnection;
+import org.apache.arrow.adbc.core.AdbcDatabase;
+import org.apache.arrow.adbc.core.AdbcDriver;
+import org.apache.arrow.adbc.core.AdbcException;
+import org.apache.arrow.adbc.core.AdbcStatement;
+import org.apache.arrow.adbc.driver.flightsql.FlightSqlDriver;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.exception.DorisException;
+import org.apache.doris.flink.exception.IllegalArgumentException;
+import org.apache.doris.flink.exception.ShouldNeverHappenException;
+import org.apache.doris.flink.rest.PartitionDefinition;
+import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.rest.SchemaUtils;
+import org.apache.doris.flink.rest.models.Schema;
+import org.apache.doris.flink.serialization.RowBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static
org.apache.doris.flink.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE;
+
+public class DorisFlightValueReader extends ValueReader implements
AutoCloseable {
+ private static final Logger LOG =
LoggerFactory.getLogger(DorisFlightValueReader.class);
+ protected AdbcConnection client;
+ protected Lock clientLock = new ReentrantLock();
+
+ private final PartitionDefinition partition;
+ private final DorisOptions options;
+ private final DorisReadOptions readOptions;
+ private AdbcStatement statement;
+ protected RowBatch rowBatch;
+ protected Schema schema;
+ AdbcStatement.QueryResult queryResult;
+ protected ArrowReader arrowReader;
+ protected AtomicBoolean eos = new AtomicBoolean(false);
+
+ public DorisFlightValueReader(
+ PartitionDefinition partition,
+ DorisOptions options,
+ DorisReadOptions readOptions,
+ Schema schema) {
+ this.partition = partition;
+ this.options = options;
+ this.readOptions = readOptions;
+ this.client = openConnection();
+ this.schema = schema;
+ init();
+ }
+
+ private void init() {
+ clientLock.lock();
+ try {
+ this.statement = this.client.createStatement();
+ this.statement.setSqlQuery(
+ RestService.parseFlightSql(readOptions, options,
partition, LOG));
+ this.queryResult = statement.executeQuery();
+ this.arrowReader = queryResult.getReader();
+ } catch (AdbcException | DorisException e) {
+ throw new RuntimeException(e);
+ } finally {
+ clientLock.unlock();
+ }
+ LOG.debug("Open scan result is, schema: {}.", schema);
+ }
+
+ private AdbcConnection openConnection() {
+ final Map<String, Object> parameters = new HashMap<>();
+ RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
+ FlightSqlDriver driver = new FlightSqlDriver(allocator);
+ String[] split = null;
+ try {
+ split = RestService.randomEndpoint(options.getFenodes(),
LOG).split(":");
+ } catch (IllegalArgumentException e) {
+ throw new RuntimeException("Get FENode Error", e);
+ }
+ AdbcDriver.PARAM_URI.set(
+ parameters,
+ Location.forGrpcInsecure(String.valueOf(split[0]),
readOptions.getFlightSqlPort())
+ .getUri()
+ .toString());
+ AdbcDriver.PARAM_USERNAME.set(parameters, options.getUsername());
+ AdbcDriver.PARAM_PASSWORD.set(parameters, options.getPassword());
+ try {
+ AdbcDatabase adbcDatabase = driver.open(parameters);
+ return adbcDatabase.connect();
+ } catch (AdbcException e) {
+ LOG.debug("Open Flight Connection error: {}", e.getDetails());
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * read data and cached in rowBatch.
+ *
+ * @return true if hax next value
+ */
+ public boolean hasNext() {
+ boolean hasNext = false;
+ clientLock.lock();
+ try {
+ // Arrow data was acquired synchronously during the iterative
process
+ if (!eos.get() && (rowBatch == null || !rowBatch.hasNext())) {
+ if (!eos.get()) {
+ eos.set(!arrowReader.loadNextBatch());
+ rowBatch =
+ new RowBatch(
+ arrowReader,
+ SchemaUtils.convertToSchema(
+ this.schema,
+
arrowReader.getVectorSchemaRoot().getSchema()))
+ .readFlightArrow();
+ }
+ }
+ hasNext = !eos.get();
+ return hasNext;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ clientLock.unlock();
+ }
+ }
+
+ /**
+ * get next value.
+ *
+ * @return next value
+ */
+ public List next() {
+ if (!hasNext()) {
+ LOG.error(SHOULD_NOT_HAPPEN_MESSAGE);
+ throw new ShouldNeverHappenException();
+ }
+ return rowBatch.next();
+ }
+
+ @Override
+ public void close() throws Exception {
+ clientLock.lock();
+ try {
+ if (rowBatch != null) {
+ rowBatch.close();
+ }
+ if (statement != null) {
+ statement.close();
+ }
+ if (client != null) {
+ client.close();
+ }
+ } finally {
+ clientLock.unlock();
+ }
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
index 01777d43..c9ed6f9c 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
@@ -23,6 +23,7 @@ import
org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.source.split.DorisSourceSplit;
import org.apache.doris.flink.source.split.DorisSplitRecords;
import org.slf4j.Logger;
@@ -41,7 +42,7 @@ public class DorisSourceSplitReader implements
SplitReader<List, DorisSourceSpli
private final Queue<DorisSourceSplit> splits;
private final DorisOptions options;
private final DorisReadOptions readOptions;
- private DorisValueReader valueReader;
+ private ValueReader valueReader;
private String currentSplitId;
public DorisSourceSplitReader(DorisOptions options, DorisReadOptions
readOptions) {
@@ -52,7 +53,11 @@ public class DorisSourceSplitReader implements
SplitReader<List, DorisSourceSpli
@Override
public RecordsWithSplitIds<List> fetch() throws IOException {
- checkSplitOrStartNext();
+ try {
+ checkSplitOrStartNext();
+ } catch (DorisException e) {
+ throw new RuntimeException(e);
+ }
if (!valueReader.hasNext()) {
return finishSplit();
@@ -60,7 +65,7 @@ public class DorisSourceSplitReader implements
SplitReader<List, DorisSourceSpli
return DorisSplitRecords.forRecords(currentSplitId, valueReader);
}
- private void checkSplitOrStartNext() throws IOException {
+ private void checkSplitOrStartNext() throws IOException, DorisException {
if (valueReader != null) {
return;
}
@@ -70,7 +75,8 @@ public class DorisSourceSplitReader implements
SplitReader<List, DorisSourceSpli
}
currentSplitId = nextSplit.splitId();
valueReader =
- new DorisValueReader(nextSplit.getPartitionDefinition(),
options, readOptions);
+ ValueReader.createReader(
+ nextSplit.getPartitionDefinition(), options,
readOptions, LOG);
}
private DorisSplitRecords finishSplit() {
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
index 098a7707..35639e8a 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
@@ -52,7 +52,7 @@ import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_EXEC_MEM_LIM
import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT;
import static
org.apache.doris.flink.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE;
-public class DorisValueReader implements AutoCloseable {
+public class DorisValueReader extends ValueReader implements AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(DorisValueReader.class);
protected BackendClient client;
protected Lock clientLock = new ReentrantLock();
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java
new file mode 100644
index 00000000..9e453934
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.source.reader;
+
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.exception.DorisException;
+import org.apache.doris.flink.rest.PartitionDefinition;
+import org.apache.doris.flink.rest.RestService;
+import org.slf4j.Logger;
+
+import java.util.List;
+
+public abstract class ValueReader {
+
+ public static ValueReader createReader(
+ PartitionDefinition partition,
+ DorisOptions options,
+ DorisReadOptions readOptions,
+ Logger logger)
+ throws DorisException {
+ logger.info("create reader for partition: {}", partition);
+ if (readOptions.getUseFlightSql()) {
+ return new DorisFlightValueReader(
+ partition,
+ options,
+ readOptions,
+ RestService.getSchema(options, readOptions, logger));
+ } else {
+ return new DorisValueReader(partition, options, readOptions);
+ }
+ }
+
+ public abstract boolean hasNext();
+
+ public abstract List next();
+
+ public abstract void close() throws Exception;
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java
index cef96762..24d10569 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java
@@ -20,6 +20,7 @@ package org.apache.doris.flink.source.split;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.doris.flink.source.reader.DorisValueReader;
+import org.apache.doris.flink.source.reader.ValueReader;
import javax.annotation.Nullable;
@@ -34,18 +35,17 @@ import java.util.Set;
public class DorisSplitRecords implements RecordsWithSplitIds<List> {
private final Set<String> finishedSplits;
- private final DorisValueReader valueReader;
+ private final ValueReader valueReader;
private String splitId;
- public DorisSplitRecords(
- String splitId, DorisValueReader valueReader, Set<String>
finishedSplits) {
+ public DorisSplitRecords(String splitId, ValueReader valueReader,
Set<String> finishedSplits) {
this.splitId = splitId;
this.valueReader = valueReader;
this.finishedSplits = finishedSplits;
}
public static DorisSplitRecords forRecords(
- final String splitId, final DorisValueReader valueReader) {
+ final String splitId, final ValueReader valueReader) {
return new DorisSplitRecords(splitId, valueReader,
Collections.emptySet());
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index 4b0b56c4..02e59084 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -307,7 +307,16 @@ public class DorisConfigOptions {
.booleanType()
.defaultValue(false)
.withDescription("Whether to use buffer cache for
breakpoint resume");
-
+ public static final ConfigOption<Boolean> USE_FLIGHT_SQL =
+ ConfigOptions.key("source.use-flight-sql")
+ .booleanType()
+ .defaultValue(Boolean.FALSE)
+ .withDescription("use flight sql flag");
+ public static final ConfigOption<Integer> FLIGHT_SQL_PORT =
+ ConfigOptions.key("source.flight-sql-port")
+ .intType()
+ .defaultValue(9040)
+ .withDescription("flight sql port");
// Prefix for Doris StreamLoad specific properties.
public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index e8ac4dbf..2559e1f0 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -52,6 +52,7 @@ import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_READ
import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_RETRIES;
import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_TABLET_SIZE;
import static org.apache.doris.flink.table.DorisConfigOptions.FENODES;
+import static org.apache.doris.flink.table.DorisConfigOptions.FLIGHT_SQL_PORT;
import static org.apache.doris.flink.table.DorisConfigOptions.IDENTIFIER;
import static org.apache.doris.flink.table.DorisConfigOptions.JDBC_URL;
import static
org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_CACHE_MAX_ROWS;
@@ -83,6 +84,7 @@ import static
org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API
import static
org.apache.doris.flink.table.DorisConfigOptions.STREAM_LOAD_PROP_PREFIX;
import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER;
import static org.apache.doris.flink.table.DorisConfigOptions.USERNAME;
+import static org.apache.doris.flink.table.DorisConfigOptions.USE_FLIGHT_SQL;
/**
* The {@link DorisDynamicTableFactory} translates the catalog table to a
table source.
@@ -157,6 +159,9 @@ public final class DorisDynamicTableFactory
options.add(SOURCE_USE_OLD_API);
options.add(SINK_WRITE_MODE);
options.add(SINK_IGNORE_COMMIT_ERROR);
+
+ options.add(USE_FLIGHT_SQL);
+ options.add(FLIGHT_SQL_PORT);
return options;
}
@@ -216,7 +221,9 @@ public final class DorisDynamicTableFactory
(int)
readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS).toMillis())
.setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES))
.setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE))
- .setUseOldApi(readableConfig.get(SOURCE_USE_OLD_API));
+ .setUseOldApi(readableConfig.get(SOURCE_USE_OLD_API))
+ .setUseFlightSql(readableConfig.get(USE_FLIGHT_SQL))
+ .setFlightSqlPort(readableConfig.get(FLIGHT_SQL_PORT));
return builder.build();
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/util/FastDateUtil.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/util/FastDateUtil.java
new file mode 100644
index 00000000..3c24b810
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/util/FastDateUtil.java
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.util;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+
+/**
+ * idea for this util is from https://bugs.openjdk.org/browse/JDK-8144808
991ms.
+ * LocalDateTime.parse(...) 246ms : LocalDateTime.of(...)
+ */
+public final class FastDateUtil {
+
+ public static LocalDateTime fastParseDateTimeV2(String dateTime, String
pattern) {
+ char[] arr = dateTime.toCharArray();
+ int[] indexes =
+ new int[] {
+ pattern.indexOf("yyyy"),
+ pattern.indexOf("MM"),
+ pattern.indexOf("dd"),
+ pattern.indexOf("HH"),
+ pattern.indexOf("mm"),
+ pattern.indexOf("ss"),
+ pattern.indexOf("SSSSSS")
+ };
+ int year = parseFromIndex(arr, indexes[0], indexes[0] + 4);
+ int month = parseFromIndex(arr, indexes[1], indexes[1] + 2);
+ int day = parseFromIndex(arr, indexes[2], indexes[2] + 2);
+ int hour = parseFromIndex(arr, indexes[3], indexes[3] + 2);
+ int minute = parseFromIndex(arr, indexes[4], indexes[4] + 2);
+ int second = parseFromIndex(arr, indexes[5], indexes[5] + 2);
+ int nanos = parseFromIndex(arr, indexes[6], indexes[6] + 6) * 1000;
+ return LocalDateTime.of(year, month, day, hour, minute, second, nanos);
+ }
+
+ public static LocalDateTime fastParseDateTime(String dateTime, String
pattern) {
+ char[] arr = dateTime.toCharArray();
+ int[] indexes =
+ new int[] {
+ pattern.indexOf("yyyy"),
+ pattern.indexOf("MM"),
+ pattern.indexOf("dd"),
+ pattern.indexOf("HH"),
+ pattern.indexOf("mm"),
+ pattern.indexOf("ss")
+ };
+ int year = parseFromIndex(arr, indexes[0], indexes[0] + 4);
+ int month = parseFromIndex(arr, indexes[1], indexes[1] + 2);
+ int day = parseFromIndex(arr, indexes[2], indexes[2] + 2);
+ int hour = parseFromIndex(arr, indexes[3], indexes[3] + 2);
+ int minute = parseFromIndex(arr, indexes[4], indexes[4] + 2);
+ int second = parseFromIndex(arr, indexes[5], indexes[5] + 2);
+ return LocalDateTime.of(year, month, day, hour, minute, second);
+ }
+
+ public static LocalDate fastParseDate(String dateTime, String pattern) {
+ char[] arr = dateTime.toCharArray();
+ int[] indexes =
+ new int[] {
+ pattern.indexOf("yyyy"), pattern.indexOf("MM"),
pattern.indexOf("dd"),
+ };
+ int year = parseFromIndex(arr, indexes[0], indexes[0] + 4);
+ int month = parseFromIndex(arr, indexes[1], indexes[1] + 2);
+ int day = parseFromIndex(arr, indexes[2], indexes[2] + 2);
+ return LocalDate.of(year, month, day);
+ }
+
+ private static int parseFromIndex(char[] arr, int start, int end) {
+ int value = 0;
+ for (int i = start; i < end; i++) {
+ value = value * 10 + (arr[i] - '0');
+ }
+ return value;
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/rest/SchemaUtilsTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/rest/SchemaUtilsTest.java
new file mode 100644
index 00000000..2353bd7d
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/rest/SchemaUtilsTest.java
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.rest;
+
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.doris.flink.exception.DorisException;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+public class SchemaUtilsTest {
+ private static final Logger logger =
LoggerFactory.getLogger(SchemaUtilsTest.class);
+
+ @Test
+ public void convertToSchema() throws DorisException {
+ Field field1 =
+ new Field("field1", FieldType.notNullable(new
ArrowType.Int(32, true)), null);
+ Field field2 =
+ new Field("field2", FieldType.notNullable(new
ArrowType.Int(32, true)), null);
+ Schema arrowSchema = new Schema(Arrays.asList(field1, field2));
+ String schemaStr =
+ "{\"properties\":["
+ +
"{\"type\":\"int\",\"name\":\"field1\",\"comment\":\"\"}"
+ +
",{\"type\":\"int\",\"name\":\"field2\",\"comment\":\"\"}"
+ + "], \"status\":200}";
+ org.apache.doris.flink.rest.models.Schema schema =
+ RestService.parseSchema(schemaStr, logger);
+
+ org.apache.doris.flink.rest.models.Schema result =
+ SchemaUtils.convertToSchema(schema, arrowSchema);
+
+ assertEquals(2, result.getProperties().size());
+ assertEquals("field1", result.getProperties().get(0).getName());
+ assertEquals("field2", result.getProperties().get(1).getName());
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
index 0004af05..05a93dc5 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
@@ -65,7 +65,8 @@ public class DorisDynamicTableFactoryTest {
properties.put("lookup.jdbc.read.batch.size", "16");
properties.put("lookup.jdbc.read.batch.queue-size", "16");
properties.put("lookup.jdbc.read.thread-size", "1");
-
+ properties.put("source.use-flight-sql", "false");
+ properties.put("source.flight-sql-port", "9040");
DynamicTableSource actual = createTableSource(SCHEMA, properties);
DorisOptions options =
DorisOptions.builder()
@@ -98,7 +99,9 @@ public class DorisDynamicTableFactoryTest {
.setRequestConnectTimeoutMs(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
.setRequestReadTimeoutMs(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
.setRequestRetries(DORIS_REQUEST_RETRIES_DEFAULT)
- .setRequestTabletSize(DORIS_TABLET_SIZE_DEFAULT);
+ .setRequestTabletSize(DORIS_TABLET_SIZE_DEFAULT)
+ .setUseFlightSql(false)
+ .setFlightSqlPort(9040);
DorisDynamicTableSource expected =
new DorisDynamicTableSource(
options,
@@ -182,7 +185,9 @@ public class DorisDynamicTableFactoryTest {
.setRequestConnectTimeoutMs(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
.setRequestReadTimeoutMs(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
.setRequestRetries(DORIS_REQUEST_RETRIES_DEFAULT)
- .setRequestTabletSize(DORIS_TABLET_SIZE_DEFAULT);
+ .setRequestTabletSize(DORIS_TABLET_SIZE_DEFAULT)
+ .setUseFlightSql(false)
+ .setFlightSqlPort(9040);
DorisDynamicTableSink expected =
new DorisDynamicTableSink(
options,
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/FastDateUtilTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/FastDateUtilTest.java
new file mode 100644
index 00000000..a89e50a0
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/FastDateUtilTest.java
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.utils;
+
+import org.apache.doris.flink.util.FastDateUtil;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class FastDateUtilTest {
+
+ @Test
+ void
fastParseDateTimeV2_withValidDateTimeAndPattern_returnsCorrectLocalDateTime() {
+ String dateTime = "2023-10-05 14:30:45.123456";
+ String pattern = "yyyy-MM-dd HH:mm:ss.SSSSSS";
+ LocalDateTime result = FastDateUtil.fastParseDateTimeV2(dateTime,
pattern);
+ assertEquals(LocalDateTime.of(2023, 10, 5, 14, 30, 45, 123456000),
result);
+ }
+
+ @Test
+ void
fastParseDateTime_withValidDateTimeAndPattern_returnsCorrectLocalDateTime() {
+ String dateTime = "2023-10-05 14:30:45";
+ String pattern = "yyyy-MM-dd HH:mm:ss";
+ LocalDateTime result = FastDateUtil.fastParseDateTime(dateTime,
pattern);
+ assertEquals(LocalDateTime.of(2023, 10, 5, 14, 30, 45), result);
+ }
+
+ @Test
+ void fastParseDate_withValidDateAndPattern_returnsCorrectLocalDate() {
+ String dateTime = "2023-10-05";
+ String pattern = "yyyy-MM-dd";
+ LocalDate result = FastDateUtil.fastParseDate(dateTime, pattern);
+ assertEquals(LocalDate.of(2023, 10, 5), result);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]