This is an automated email from the ASF dual-hosted git repository.
jiafengzheng pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 706baa0 [Feature] Refactoring DorisSouce based on FLIP-27 (#24)
706baa0 is described below
commit 706baa0348224059c3754a78d0a957e0258811ca
Author: wudi <[email protected]>
AuthorDate: Thu Apr 21 10:40:47 2022 +0800
[Feature] Refactoring DorisSouce based on FLIP-27 (#24)
* Refactoring DorisSouce based on FLIP-27
---
.../apache/doris/flink/cfg/DorisReadOptions.java | 17 ++-
.../DorisDeserializationSchema.java | 8 ++
.../RowDataDeserializationSchema.java | 50 ++++++++
.../SimpleListDeserializationSchema.java | 7 +-
.../converter/DorisRowConverter.java | 128 +++++++++++++++++++++
.../doris/flink/rest/PartitionDefinition.java | 14 +--
.../org/apache/doris/flink/rest/RestService.java | 2 +-
.../apache/doris/flink/serialization/RowBatch.java | 5 +-
.../doris/flink/sink/writer/DorisStreamLoad.java | 14 +--
.../org/apache/doris/flink/source/DorisSource.java | 123 ++++++++++++++++++++
.../doris/flink/source/DorisSourceBuilder.java | 68 +++++++++++
.../flink/source/assigners/DorisSplitAssigner.java | 53 +++++++++
.../source/assigners/SimpleSplitAssigner.java | 58 ++++++++++
.../source/enumerator/DorisSourceEnumerator.java | 97 ++++++++++++++++
.../source/enumerator/PendingSplitsCheckpoint.java | 50 ++++++++
.../PendingSplitsCheckpointSerializer.java | 124 ++++++++++++++++++++
.../flink/source/reader/DorisRecordEmitter.java | 62 ++++++++++
.../flink/source/reader/DorisSourceReader.java | 69 +++++++++++
.../source/reader/DorisSourceSplitReader.java | 103 +++++++++++++++++
.../doris/flink/source/split/DorisSourceSplit.java | 73 ++++++++++++
.../source/split/DorisSourceSplitSerializer.java | 110 ++++++++++++++++++
.../split/DorisSourceSplitState.java} | 17 ++-
.../flink/source/split/DorisSplitRecords.java | 81 +++++++++++++
.../flink/table/DorisDynamicTableFactory.java | 15 ++-
.../doris/flink/table/DorisDynamicTableSource.java | 48 +++++---
.../doris/flink/table/DorisRowDataInputFormat.java | 22 ++--
.../RowDataDeserializationSchemaTest.java | 65 +++++++++++
.../convert/DorisRowConverterTest.java | 62 ++++++++++
.../org/apache/doris/flink/sink/OptionUtils.java | 27 +++--
.../doris/flink/source/DorisSourceExampleTest.java | 48 ++++++++
.../PendingSplitsCheckpointSerializerTest.java | 50 ++++++++
.../flink/source/reader/DorisSourceReaderTest.java | 69 +++++++++++
.../flink/source/reader/TestingReaderContext.java | 98 ++++++++++++++++
.../split/DorisSourceSplitSerializerTest.java | 44 +++++++
.../flink/source/split/DorisSplitRecordsTest.java} | 23 ++--
.../flink/table/DorisDynamicTableSourceTest.java | 86 ++++++++++++++
.../org/apache/doris/flink/utils/FactoryMocks.java | 46 ++++++++
37 files changed, 1953 insertions(+), 83 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
index 0beb18c..aead8a8 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
@@ -37,10 +37,11 @@ public class DorisReadOptions implements Serializable {
private Long execMemLimit;
private Integer deserializeQueueSize;
private Boolean deserializeArrowAsync;
+ private boolean useOldApi;
public DorisReadOptions(String readFields, String filterQuery, Integer
requestTabletSize, Integer requestConnectTimeoutMs, Integer
requestReadTimeoutMs,
Integer requestQueryTimeoutS, Integer
requestRetries, Integer requestBatchSize, Long execMemLimit,
- Integer deserializeQueueSize, Boolean
deserializeArrowAsync) {
+ Integer deserializeQueueSize, Boolean
deserializeArrowAsync, boolean useOldApi) {
this.readFields = readFields;
this.filterQuery = filterQuery;
this.requestTabletSize = requestTabletSize;
@@ -52,6 +53,7 @@ public class DorisReadOptions implements Serializable {
this.execMemLimit = execMemLimit;
this.deserializeQueueSize = deserializeQueueSize;
this.deserializeArrowAsync = deserializeArrowAsync;
+ this.useOldApi = useOldApi;
}
public String getReadFields() {
@@ -98,12 +100,15 @@ public class DorisReadOptions implements Serializable {
return deserializeArrowAsync;
}
+ public boolean getUseOldApi() {
+ return useOldApi;
+ }
public static Builder builder() {
return new Builder();
}
- public static DorisReadOptions defaults(){
+ public static DorisReadOptions defaults() {
return DorisReadOptions.builder().build();
}
@@ -123,6 +128,7 @@ public class DorisReadOptions implements Serializable {
private Long execMemLimit;
private Integer deserializeQueueSize;
private Boolean deserializeArrowAsync;
+ private Boolean useOldApi = false;
public Builder setReadFields(String readFields) {
@@ -180,8 +186,13 @@ public class DorisReadOptions implements Serializable {
return this;
}
+ public Builder setUseOldApi(boolean useOldApi) {
+ this.useOldApi = useOldApi;
+ return this;
+ }
+
public DorisReadOptions build() {
- return new DorisReadOptions(readFields, filterQuery,
requestTabletSize, requestConnectTimeoutMs, requestReadTimeoutMs,
requestQueryTimeoutS, requestRetries, requestBatchSize, execMemLimit,
deserializeQueueSize, deserializeArrowAsync);
+ return new DorisReadOptions(readFields, filterQuery,
requestTabletSize, requestConnectTimeoutMs, requestReadTimeoutMs,
requestQueryTimeoutS, requestRetries, requestBatchSize, execMemLimit,
deserializeQueueSize, deserializeArrowAsync, useOldApi);
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisDeserializationSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisDeserializationSchema.java
index 2aaec99..608fa5c 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisDeserializationSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisDeserializationSchema.java
@@ -17,8 +17,16 @@
package org.apache.doris.flink.deserialization;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
import java.io.Serializable;
+import java.util.List;
+/**
+ * The deserialization schema describes how to turn the doris list record into
data types
+ * (Java/Scala objects) that are processed by Flink.
+ **/
public interface DorisDeserializationSchema<T> extends Serializable,
ResultTypeQueryable<T> {
+
+ void deserialize(List<?> record, Collector<T> out) throws Exception;
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchema.java
new file mode 100644
index 0000000..f9c65b3
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchema.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.deserialization;
+
+import org.apache.doris.flink.deserialization.converter.DorisRowConverter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+import java.util.List;
+
+
+/**
+ * A simple implementation of {@link DorisDeserializationSchema} which
converts the received
+ * list record into {@link GenericRowData}.
+ */
+public class RowDataDeserializationSchema implements
DorisDeserializationSchema<RowData> {
+
+ private final DorisRowConverter rowConverter;
+
+ public RowDataDeserializationSchema(RowType rowType) {
+ this.rowConverter = new DorisRowConverter(rowType);
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return TypeInformation.of(RowData.class);
+ }
+
+ @Override
+ public void deserialize(List<?> record, Collector<RowData> out) throws
Exception {
+ RowData row = rowConverter.convert(record);
+ out.collect(row);
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
index d9ec6e5..43e68ef 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
@@ -19,10 +19,10 @@ package org.apache.doris.flink.deserialization;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Collector;
import java.util.List;
-
public class SimpleListDeserializationSchema implements
DorisDeserializationSchema<List<?>> {
@Override
@@ -30,4 +30,9 @@ public class SimpleListDeserializationSchema implements
DorisDeserializationSche
return TypeInformation.of(new TypeHint<List<?>>() {
});
}
+
+ @Override
+ public void deserialize(List<?> record, Collector<List<?>> out) throws
Exception {
+ out.collect(record);
+ }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
new file mode 100644
index 0000000..b84fcdb
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.deserialization.converter;
+
+import org.apache.doris.flink.serialization.RowBatch;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class DorisRowConverter implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private final DeserializationConverter[] deserializationConverters;
+
+ public DorisRowConverter(RowType rowType) {
+ checkNotNull(rowType);
+ this.deserializationConverters = new
DeserializationConverter[rowType.getFieldCount()];
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ deserializationConverters[i] =
createNullableConverter(rowType.getTypeAt(i));
+ }
+ }
+
+ /**
+ * Convert data retrieved from {@link RowBatch} to {@link RowData}.
+ *
+ * @param record from rowBatch
+ */
+ public GenericRowData convert(List record){
+ GenericRowData rowData = new GenericRowData(record.size());
+ for (int i = 0; i < record.size(); i++) {
+ rowData.setField(i,
deserializationConverters[i].deserialize(record.get(i)));
+ }
+ return rowData;
+ }
+
+
+ /**
+ * Create a nullable runtime {@link DeserializationConverter} from given
{@link
+ * LogicalType}.
+ */
+ protected DeserializationConverter createNullableConverter(LogicalType
type) {
+ return wrapIntoNullableInternalConverter(createConverter(type));
+ }
+
+ protected DeserializationConverter wrapIntoNullableInternalConverter(
+ DeserializationConverter deserializationConverter) {
+ return val -> {
+ if (val == null) {
+ return null;
+ } else {
+ return deserializationConverter.deserialize(val);
+ }
+ };
+ }
+
+ /** Runtime converter to convert doris field to {@link RowData} type
object. */
+ @FunctionalInterface
+ interface DeserializationConverter extends Serializable {
+ /**
+ * Convert a doris field object of {@link RowBatch } to the data
structure object.
+ *
+ * @param field
+ */
+ Object deserialize(Object field);
+ }
+
+ protected DeserializationConverter createConverter(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case NULL:
+ return val -> null;
+ case BOOLEAN:
+ case FLOAT:
+ case DOUBLE:
+ case INTERVAL_YEAR_MONTH:
+ case INTERVAL_DAY_TIME:
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case BIGINT:
+ return val -> val;
+ case DECIMAL:
+ final int precision = ((DecimalType) type).getPrecision();
+ final int scale = ((DecimalType) type).getScale();
+ return val -> DecimalData.fromBigDecimal((BigDecimal) val,
precision, scale);
+ case TIMESTAMP_WITH_TIME_ZONE:
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ case DATE:
+ case CHAR:
+ case VARCHAR:
+ return val -> StringData.fromString((String) val);
+ case TIME_WITHOUT_TIME_ZONE:
+ case BINARY:
+ case VARBINARY:
+ case ARRAY:
+ case ROW:
+ case MAP:
+ case MULTISET:
+ case RAW:
+ default:
+ throw new UnsupportedOperationException("Unsupported type:" +
type);
+ }
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java
index 8a66f76..e753d66 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java
@@ -17,8 +17,6 @@
package org.apache.doris.flink.rest;
-import org.apache.doris.flink.cfg.DorisOptions;
-
import java.io.Serializable;
import java.util.Collections;
import java.util.HashSet;
@@ -35,16 +33,10 @@ public class PartitionDefinition implements Serializable,
Comparable<PartitionDe
private final String beAddress;
private final Set<Long> tabletIds;
private final String queryPlan;
- private final String serializedSettings;
public PartitionDefinition(String database, String table,
- DorisOptions settings, String beAddress,
Set<Long> tabletIds, String queryPlan)
+ String beAddress, Set<Long> tabletIds, String
queryPlan)
throws IllegalArgumentException {
- if (settings != null) {
- this.serializedSettings = settings.save();
- } else {
- this.serializedSettings = null;
- }
this.database = database;
this.table = table;
this.beAddress = beAddress;
@@ -72,7 +64,6 @@ public class PartitionDefinition implements Serializable,
Comparable<PartitionDe
return queryPlan;
}
-
@Override
public int compareTo(PartitionDefinition o) {
int cmp = database.compareTo(o.database);
@@ -123,8 +114,7 @@ public class PartitionDefinition implements Serializable,
Comparable<PartitionDe
Objects.equals(table, that.table) &&
Objects.equals(beAddress, that.beAddress) &&
Objects.equals(tabletIds, that.tabletIds) &&
- Objects.equals(queryPlan, that.queryPlan) &&
- Objects.equals(serializedSettings, that.serializedSettings);
+ Objects.equals(queryPlan, that.queryPlan) ;
}
@Override
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index 6ac84bc..734bfdb 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -675,7 +675,7 @@ public class RestService implements Serializable {
first, Math.min(beInfo.getValue().size(), first +
tabletsSize)));
first = first + tabletsSize;
PartitionDefinition partitionDefinition =
- new PartitionDefinition(database, table, options,
+ new PartitionDefinition(database, table,
beInfo.getKey(), partitionTablets,
opaquedQueryPlan);
logger.debug("Generate one PartitionDefinition '{}'.",
partitionDefinition);
partitions.add(partitionDefinition);
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index d235aa9..3be6f87 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -36,6 +36,8 @@ import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.rest.models.Schema;
import org.apache.doris.thrift.TScanBatchResult;
import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -44,9 +46,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* row batch data container.
*/
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index 7849559..b468cc8 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -17,10 +17,6 @@
package org.apache.doris.flink.sink.writer;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
@@ -31,7 +27,9 @@ import org.apache.doris.flink.exception.StreamLoadException;
import org.apache.doris.flink.rest.models.RespContent;
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.ResponseUtil;
-
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.client.CloseableHttpClient;
@@ -41,7 +39,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -53,11 +50,8 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import static org.apache.doris.flink.sink.LoadStatus.FAIL;
-import static org.apache.doris.flink.sink.ResponseUtil.LABEL_EXIST_PATTERN;
import static org.apache.doris.flink.sink.LoadStatus.LABEL_ALREADY_EXIST;
-import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
-import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
-import static org.apache.doris.flink.sink.writer.LoadConstants.JSON;
+import static org.apache.doris.flink.sink.ResponseUtil.LABEL_EXIST_PATTERN;
import static
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
import static
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
new file mode 100644
index 0000000..e2b6d41
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source;
+
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.deserialization.DorisDeserializationSchema;
+import org.apache.doris.flink.rest.PartitionDefinition;
+import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.source.assigners.DorisSplitAssigner;
+import org.apache.doris.flink.source.assigners.SimpleSplitAssigner;
+import org.apache.doris.flink.source.enumerator.DorisSourceEnumerator;
+import org.apache.doris.flink.source.enumerator.PendingSplitsCheckpoint;
+import
org.apache.doris.flink.source.enumerator.PendingSplitsCheckpointSerializer;
+import org.apache.doris.flink.source.reader.DorisRecordEmitter;
+import org.apache.doris.flink.source.reader.DorisSourceReader;
+import org.apache.doris.flink.source.split.DorisSourceSplit;
+import org.apache.doris.flink.source.split.DorisSourceSplitSerializer;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * DorisSource based on FLIP-27 which is a BOUNDED stream.
+ **/
+public class DorisSource<OUT> implements Source<OUT, DorisSourceSplit,
PendingSplitsCheckpoint>,
+ ResultTypeQueryable<OUT> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DorisSource.class);
+
+ private final DorisOptions options;
+ private final DorisReadOptions readOptions;
+
+ // Boundedness
+ private final Boundedness boundedness;
+ private final DorisDeserializationSchema<OUT> deserializer;
+
+ public DorisSource(DorisOptions options,
+ DorisReadOptions readOptions,
+ Boundedness boundedness,
+ DorisDeserializationSchema<OUT> deserializer) {
+ this.options = options;
+ this.readOptions = readOptions;
+ this.boundedness = boundedness;
+ this.deserializer = deserializer;
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return this.boundedness;
+ }
+
+ @Override
+ public SourceReader<OUT, DorisSourceSplit>
createReader(SourceReaderContext readerContext) throws Exception {
+ return new DorisSourceReader<>(
+ options,
+ readOptions,
+ new DorisRecordEmitter<>(deserializer),
+ readerContext,
+ readerContext.getConfiguration()
+ );
+ }
+
+ @Override
+ public SplitEnumerator<DorisSourceSplit, PendingSplitsCheckpoint>
createEnumerator(SplitEnumeratorContext<DorisSourceSplit> context) throws
Exception {
+ List<DorisSourceSplit> dorisSourceSplits = new ArrayList<>();
+ List<PartitionDefinition> partitions =
RestService.findPartitions(options, readOptions, LOG);
+ partitions.forEach(m -> dorisSourceSplits.add(new
DorisSourceSplit(m)));
+ DorisSplitAssigner splitAssigner = new
SimpleSplitAssigner(dorisSourceSplits);
+
+ return new DorisSourceEnumerator(context, splitAssigner);
+ }
+
+ @Override
+ public SplitEnumerator<DorisSourceSplit, PendingSplitsCheckpoint>
restoreEnumerator(
+ SplitEnumeratorContext<DorisSourceSplit> context,
+ PendingSplitsCheckpoint checkpoint) throws Exception {
+ Collection<DorisSourceSplit> splits = checkpoint.getSplits();
+ DorisSplitAssigner splitAssigner = new SimpleSplitAssigner(splits);
+ return new DorisSourceEnumerator(context, splitAssigner);
+ }
+
+ @Override
+ public SimpleVersionedSerializer<DorisSourceSplit> getSplitSerializer() {
+ return DorisSourceSplitSerializer.INSTANCE;
+ }
+
+ @Override
+ public SimpleVersionedSerializer<PendingSplitsCheckpoint>
getEnumeratorCheckpointSerializer() {
+ return new PendingSplitsCheckpointSerializer(getSplitSerializer());
+ }
+
+ @Override
+ public TypeInformation<OUT> getProducedType() {
+ return deserializer.getProducedType();
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSourceBuilder.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSourceBuilder.java
new file mode 100644
index 0000000..8d96c08
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSourceBuilder.java
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source;
+
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.deserialization.DorisDeserializationSchema;
+import org.apache.flink.api.connector.source.Boundedness;
+
+/**
+ * The builder class for {@link DorisSource} to make it easier for the users
to construct a {@link
+ * DorisSource}.
+ **/
+public class DorisSourceBuilder<OUT> {
+
+ private DorisOptions options;
+ private DorisReadOptions readOptions;
+
+ // Boundedness
+ private Boundedness boundedness;
+ private DorisDeserializationSchema<OUT> deserializer;
+
+ DorisSourceBuilder() {
+ boundedness = Boundedness.BOUNDED;
+ }
+
+ public static <OUT> DorisSourceBuilder<OUT> builder() {
+ return new DorisSourceBuilder();
+ }
+
+ public DorisSourceBuilder<OUT> setDorisOptions(DorisOptions options) {
+ this.options = options;
+ return this;
+ }
+
+ public DorisSourceBuilder<OUT> setDorisReadOptions(DorisReadOptions
readOptions) {
+ this.readOptions = readOptions;
+ return this;
+ }
+
+ public DorisSourceBuilder<OUT> setBoundedness(Boundedness boundedness) {
+ this.boundedness = boundedness;
+ return this;
+ }
+
+ public DorisSourceBuilder<OUT>
setDeserializer(DorisDeserializationSchema<OUT> deserializer) {
+ this.deserializer = deserializer;
+ return this;
+ }
+
+ public DorisSource<OUT> build() {
+ return new DorisSource<>(options, readOptions, boundedness,
deserializer);
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/DorisSplitAssigner.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/DorisSplitAssigner.java
new file mode 100644
index 0000000..bf541b3
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/DorisSplitAssigner.java
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.assigners;
+
+import org.apache.doris.flink.source.enumerator.PendingSplitsCheckpoint;
+import org.apache.doris.flink.source.split.DorisSourceSplit;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.Optional;
+
+/**
+ * The {@code DorisSplitAssigner} is responsible for deciding what split
should be processed. It
+ * determines split processing order.
+ */
+public interface DorisSplitAssigner {
+
+ /**
+ * Gets the next split.
+ *
+ * <p>When this method returns an empty {@code Optional}, then the set of
splits is assumed to
+ * be done and the source will finish once the readers finished their
current splits.
+ */
+ Optional<DorisSourceSplit> getNext(@Nullable String hostname);
+
+ /**
+ * Adds a set of splits to this assigner. This happens for example when
some split processing
+ * failed and the splits need to be re-added, or when new splits got
discovered.
+ */
+ void addSplits(Collection<DorisSourceSplit> splits);
+
+ /**
+ * Creates a snapshot of the state of this split assigner, to be stored in
a checkpoint.
+ *
+ * @param checkpointId The ID of the checkpoint for which the snapshot is
created.
+ * @return an object containing the state of the split enumerator.
+ */
+ PendingSplitsCheckpoint snapshotState(long checkpointId);
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java
new file mode 100644
index 0000000..eef17f2
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.assigners;
+
+import org.apache.doris.flink.source.enumerator.PendingSplitsCheckpoint;
+import org.apache.doris.flink.source.split.DorisSourceSplit;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+
+/**
+ * The {@code SimpleSplitAssigner} hands out splits in a random order.
+ **/
+public class SimpleSplitAssigner implements DorisSplitAssigner {
+
+ private final ArrayList<DorisSourceSplit> splits;
+
+ public SimpleSplitAssigner(Collection<DorisSourceSplit> splits) {
+ this.splits = new ArrayList<>(splits);
+ }
+
+ @Override
+ public Optional<DorisSourceSplit> getNext(@Nullable String hostname) {
+ final int size = splits.size();
+ return size == 0 ? Optional.empty() : Optional.of(splits.remove(size -
1));
+ }
+
+ @Override
+ public void addSplits(Collection<DorisSourceSplit> splits) {
+ splits.addAll(splits);
+ }
+
+ @Override
+ public PendingSplitsCheckpoint snapshotState(long checkpointId) {
+ return new PendingSplitsCheckpoint(splits);
+ }
+
+ @Override
+ public String toString() {
+ return "SimpleSplitAssigner " + splits;
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java
new file mode 100644
index 0000000..3275a22
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.enumerator;
+
+import org.apache.doris.flink.source.DorisSource;
+import org.apache.doris.flink.source.assigners.DorisSplitAssigner;
+import org.apache.doris.flink.source.split.DorisSourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A SplitEnumerator implementation for bounded / batch {@link DorisSource}
input.
+ * <p>
+ * This enumerator takes all backend tablets and assigns them to the readers.
+ * Once tablets are processed, the source is finished.
+ */
+public class DorisSourceEnumerator
+ implements SplitEnumerator<DorisSourceSplit, PendingSplitsCheckpoint> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DorisSourceEnumerator.class);
+ private final SplitEnumeratorContext<DorisSourceSplit> context;
+
+ private final DorisSplitAssigner splitAssigner;
+
+ public DorisSourceEnumerator(SplitEnumeratorContext<DorisSourceSplit>
context,
+ DorisSplitAssigner splitAssigner) {
+ this.context = context;
+ this.splitAssigner = checkNotNull(splitAssigner);
+ }
+
+ @Override
+ public void start() {
+ // no resources to start
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId, @Nullable String hostname) {
+ if (!context.registeredReaders().containsKey(subtaskId)) {
+ // reader failed between sending the request and now. skip this
request.
+ return;
+ }
+
+ final Optional<DorisSourceSplit> nextSplit =
splitAssigner.getNext(hostname);
+ if (nextSplit.isPresent()) {
+ final DorisSourceSplit split = nextSplit.get();
+ context.assignSplit(split, subtaskId);
+ LOG.info("Assigned split to subtask {} : {}", subtaskId, split);
+ } else {
+ context.signalNoMoreSplits(subtaskId);
+ LOG.info("No more splits available for subtask {}", subtaskId);
+ }
+ }
+
+ @Override
+ public void addSplitsBack(List<DorisSourceSplit> splits, int subtaskId) {
+ LOG.debug("Doris Source Enumerator adds splits back: {}", splits);
+ splitAssigner.addSplits(splits);
+ }
+
+ @Override
+ public void addReader(int subtaskId) {
+ // do nothing
+ }
+
+ @Override
+ public PendingSplitsCheckpoint snapshotState(long checkpointId) throws
Exception {
+ return splitAssigner.snapshotState(checkpointId);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // no resources to close
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpoint.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpoint.java
new file mode 100644
index 0000000..3edba0f
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpoint.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.enumerator;
+
+import org.apache.doris.flink.source.split.DorisSourceSplit;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+
+/**
+ * A checkpoint of the current state of the containing the currently pending
splits that are not yet
+ * assigned.
+ */
+public class PendingSplitsCheckpoint {
+
+ /**
+ * The splits in the checkpoint.
+ */
+ private final Collection<DorisSourceSplit> splits;
+
+ /**
+ * The cached byte representation from the last serialization step. This
helps to avoid paying
+ * repeated serialization cost for the same checkpoint object. This field
is used by {@link
+ * PendingSplitsCheckpointSerializer}.
+ */
+ @Nullable
+ byte[] serializedFormCache;
+
+ public PendingSplitsCheckpoint(Collection<DorisSourceSplit> splits) {
+ this.splits = splits;
+ }
+
+ public Collection<DorisSourceSplit> getSplits() {
+ return splits;
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializer.java
new file mode 100644
index 0000000..deeb544
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializer.java
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.enumerator;
+
+import org.apache.doris.flink.source.split.DorisSourceSplit;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A serializer for the {@link PendingSplitsCheckpoint}.
+ */
+public class PendingSplitsCheckpointSerializer
+ implements SimpleVersionedSerializer<PendingSplitsCheckpoint> {
+
+ private static final int VERSION = 1;
+
+ private static final int VERSION_1_MAGIC_NUMBER = 0xDEADBEEF;
+
+ private final SimpleVersionedSerializer<DorisSourceSplit> splitSerializer;
+
+ public
PendingSplitsCheckpointSerializer(SimpleVersionedSerializer<DorisSourceSplit>
splitSerializer) {
+ this.splitSerializer = checkNotNull(splitSerializer);
+ }
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public byte[] serialize(PendingSplitsCheckpoint checkpoint) throws
IOException {
+ // optimization: the splits lazily cache their own serialized form
+ if (checkpoint.serializedFormCache != null) {
+ return checkpoint.serializedFormCache;
+ }
+
+ Collection<DorisSourceSplit> splits = checkpoint.getSplits();
+ final ArrayList<byte[]> serializedSplits = new
ArrayList<>(splits.size());
+
+ int totalLen =
+ 12; // four ints: magic, version of split serializer, count
splits
+
+ for (DorisSourceSplit split : splits) {
+ final byte[] serSplit = splitSerializer.serialize(split);
+ serializedSplits.add(serSplit);
+ totalLen += serSplit.length + 4; // 4 bytes for the length field
+ }
+
+ final byte[] result = new byte[totalLen];
+ final ByteBuffer byteBuffer =
ByteBuffer.wrap(result).order(ByteOrder.LITTLE_ENDIAN);
+ byteBuffer.putInt(VERSION_1_MAGIC_NUMBER);
+ byteBuffer.putInt(splitSerializer.getVersion());
+ byteBuffer.putInt(serializedSplits.size());
+
+ for (byte[] splitBytes : serializedSplits) {
+ byteBuffer.putInt(splitBytes.length);
+ byteBuffer.put(splitBytes);
+ }
+
+ assert byteBuffer.remaining() == 0;
+
+ // optimization: cache the serialized from, so we avoid the byte work
during repeated
+ // serialization
+ checkpoint.serializedFormCache = result;
+
+ return result;
+ }
+
+ @Override
+ public PendingSplitsCheckpoint deserialize(int version, byte[] serialized)
throws IOException {
+ if (version == 1) {
+ return deserialize(serialized);
+ }
+ throw new IOException("Unknown version: " + version);
+ }
+
+ private PendingSplitsCheckpoint deserialize(byte[] serialized) throws
IOException {
+ final ByteBuffer bb =
ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
+
+ final int magic = bb.getInt();
+ if (magic != VERSION_1_MAGIC_NUMBER) {
+ throw new IOException(
+ String.format(
+ "Invalid magic number for PendingSplitsCheckpoint.
"
+ + "Expected: %X , found %X",
+ VERSION_1_MAGIC_NUMBER, magic));
+ }
+
+ final int splitSerializerVersion = bb.getInt();
+ final int numSplits = bb.getInt();
+
+ SimpleVersionedSerializer<DorisSourceSplit> splitSerializer =
this.splitSerializer;// stack cache
+ final ArrayList<DorisSourceSplit> splits = new ArrayList<>(numSplits);
+
+ for (int remaining = numSplits; remaining > 0; remaining--) {
+ final byte[] bytes = new byte[bb.getInt()];
+ bb.get(bytes);
+ final DorisSourceSplit split =
splitSerializer.deserialize(splitSerializerVersion, bytes);
+ splits.add(split);
+ }
+ return new PendingSplitsCheckpoint(splits);
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisRecordEmitter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisRecordEmitter.java
new file mode 100644
index 0000000..61c31fc
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisRecordEmitter.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.reader;
+
+import org.apache.doris.flink.deserialization.DorisDeserializationSchema;
+import org.apache.doris.flink.source.split.DorisSourceSplitState;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+
+/**
+ * The {@link RecordEmitter} implementation for {@link DorisSourceReader}.
+ **/
+public class DorisRecordEmitter<T>
+ implements RecordEmitter<List, T, DorisSourceSplitState> {
+
+ private final DorisDeserializationSchema<T> dorisDeserializationSchema;
+ private final OutputCollector<T> outputCollector;
+
+
+ public DorisRecordEmitter(DorisDeserializationSchema<T>
dorisDeserializationSchema) {
+ this.dorisDeserializationSchema = dorisDeserializationSchema;
+ this.outputCollector = new OutputCollector<>();
+ }
+
+
+ @Override
+ public void emitRecord(List value, SourceOutput<T> output,
DorisSourceSplitState splitState) throws Exception {
+ outputCollector.output = output;
+ dorisDeserializationSchema.deserialize(value, outputCollector);
+ }
+
+ private static class OutputCollector<T> implements Collector<T> {
+ private SourceOutput<T> output;
+
+ @Override
+ public void collect(T record) {
+ output.collect(record);
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceReader.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceReader.java
new file mode 100644
index 0000000..db63d19
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceReader.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.reader;
+
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.source.split.DorisSourceSplit;
+import org.apache.doris.flink.source.split.DorisSourceSplitState;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link SourceReader} that read records from {@link DorisSourceSplit}.
+ **/
+public class DorisSourceReader<T>
+ extends SingleThreadMultiplexSourceReaderBase<List, T,
DorisSourceSplit, DorisSourceSplitState> {
+
+
+ public DorisSourceReader(DorisOptions options,
+ DorisReadOptions readOptions,
+ RecordEmitter<List, T, DorisSourceSplitState>
recordEmitter,
+ SourceReaderContext context,
+ Configuration config) {
+ super(() -> new DorisSourceSplitReader(options, readOptions),
recordEmitter, config, context);
+ }
+
+ @Override
+ public void start() {
+ // we request a split only if we did not get splits during the
checkpoint restore
+ if (getNumberOfCurrentlyAssignedSplits() == 0) {
+ context.sendSplitRequest();
+ }
+ }
+
+ @Override
+ protected void onSplitFinished(Map<String, DorisSourceSplitState>
finishedSplitIds) {
+ context.sendSplitRequest();
+ }
+
+ @Override
+ protected DorisSourceSplitState initializedState(DorisSourceSplit split) {
+ return new DorisSourceSplitState(split);
+ }
+
+ @Override
+ protected DorisSourceSplit toSplitType(String splitId,
DorisSourceSplitState splitState) {
+ return splitState.toDorisSourceSplit();
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
new file mode 100644
index 0000000..aadf6fd
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.reader;
+
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.datastream.ScalaValueReader;
+import org.apache.doris.flink.source.split.DorisSourceSplit;
+import org.apache.doris.flink.source.split.DorisSplitRecords;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * The {@link SplitReader} implementation for the doris source.
+ **/
+public class DorisSourceSplitReader
+ implements SplitReader<List, DorisSourceSplit> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DorisSourceSplitReader.class);
+
+ private final Queue<DorisSourceSplit> splits;
+ private final DorisOptions options;
+ private final DorisReadOptions readOptions;
+ private ScalaValueReader scalaValueReader;
+ private String currentSplitId;
+
+ public DorisSourceSplitReader(DorisOptions options, DorisReadOptions
readOptions) {
+ this.options = options;
+ this.readOptions = readOptions;
+ this.splits = new ArrayDeque<>();
+ }
+
+ @Override
+ public RecordsWithSplitIds<List> fetch() throws IOException {
+ checkSplitOrStartNext();
+
+ if (!scalaValueReader.hasNext()) {
+ return finishSplit();
+ }
+ return DorisSplitRecords.forRecords(currentSplitId, scalaValueReader);
+ }
+
+ private void checkSplitOrStartNext() throws IOException {
+ if (scalaValueReader != null) {
+ return;
+ }
+ final DorisSourceSplit nextSplit = splits.poll();
+ if (nextSplit == null) {
+ throw new IOException("Cannot fetch from another split - no split
remaining");
+ }
+ currentSplitId = nextSplit.splitId();
+ scalaValueReader = new
ScalaValueReader(nextSplit.getPartitionDefinition(), options, readOptions);
+ }
+
+ private DorisSplitRecords finishSplit() {
+ if (scalaValueReader != null) {
+ scalaValueReader.close();
+ scalaValueReader = null;
+ }
+ final DorisSplitRecords finishRecords =
DorisSplitRecords.finishedSplit(currentSplitId);
+ currentSplitId = null;
+ return finishRecords;
+ }
+
+ @Override
+ public void handleSplitsChanges(SplitsChange<DorisSourceSplit>
splitsChange) {
+ LOG.debug("Handling split change {}", splitsChange);
+ splits.addAll(splitsChange.splits());
+ }
+
+ @Override
+ public void wakeUp() {
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (scalaValueReader != null) {
+ scalaValueReader.close();
+ }
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplit.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplit.java
new file mode 100644
index 0000000..c6a644d
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplit.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.split;
+
+import org.apache.doris.flink.rest.PartitionDefinition;
+import org.apache.flink.api.connector.source.SourceSplit;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+/**
+ * A {@link SourceSplit} that represents a {@link PartitionDefinition}.
+ **/
+public class DorisSourceSplit implements SourceSplit {
+
+ private final PartitionDefinition partitionDefinition;
+
+ /**
+ * The splits are frequently serialized into checkpoints. Caching the byte
representation makes
+ * repeated serialization cheap. This field is used by {@link
DorisSourceSplitSerializer}.
+ */
+ @Nullable
+ transient byte[] serializedFormCache;
+
+ public DorisSourceSplit(PartitionDefinition partitionDefinition) {
+ this.partitionDefinition = partitionDefinition;
+ }
+
+ @Override
+ public String splitId() {
+ return partitionDefinition.getBeAddress();
+ }
+
+ public PartitionDefinition getPartitionDefinition() {
+ return partitionDefinition;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("DorisSourceSplit: %s.%s,be=%s,tablets=%s",
+ partitionDefinition.getDatabase(),
+ partitionDefinition.getTable(),
+ partitionDefinition.getBeAddress(),
+ partitionDefinition.getTabletIds());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DorisSourceSplit that = (DorisSourceSplit) o;
+
+ return Objects.equals(partitionDefinition, that.partitionDefinition);
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java
new file mode 100644
index 0000000..d667ed6
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.split;
+
+import org.apache.doris.flink.rest.PartitionDefinition;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A serializer for the {@link DorisSourceSplit}.
+ **/
+public class DorisSourceSplitSerializer
+ implements SimpleVersionedSerializer<DorisSourceSplit> {
+
+ public static final DorisSourceSplitSerializer INSTANCE = new
DorisSourceSplitSerializer();
+
+ private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
+ ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
+
+ private static final int VERSION = 1;
+
+ private static void writeLongArray(DataOutputView out, Long[] values)
throws IOException {
+ out.writeInt(values.length);
+ for (Long val : values) {
+ out.writeLong(val);
+ }
+ }
+
+ private static Long[] readLongArray(DataInputView in) throws IOException {
+ final int len = in.readInt();
+ final Long[] values = new Long[len];
+ for (int i = 0; i < len; i++) {
+ values[i] = in.readLong();
+ }
+ return values;
+ }
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public byte[] serialize(DorisSourceSplit split) throws IOException {
+
+ // optimization: the splits lazily cache their own serialized form
+ if (split.serializedFormCache != null) {
+ return split.serializedFormCache;
+ }
+
+ final DataOutputSerializer out = SERIALIZER_CACHE.get();
+ PartitionDefinition partDef = split.getPartitionDefinition();
+ out.writeUTF(partDef.getDatabase());
+ out.writeUTF(partDef.getTable());
+ out.writeUTF(partDef.getBeAddress());
+ writeLongArray(out, partDef.getTabletIds().toArray(new Long[]{}));
+ out.writeUTF(partDef.getQueryPlan());
+
+ final byte[] result = out.getCopyOfBuffer();
+ out.clear();
+
+ // optimization: cache the serialized from, so we avoid the byte work
during repeated
+ // serialization
+ split.serializedFormCache = result;
+
+ return result;
+ }
+
+ @Override
+ public DorisSourceSplit deserialize(int version, byte[] serialized) throws
IOException {
+ if (version == 1) {
+ return deserialize(serialized);
+ }
+ throw new IOException("Unknown version: " + version);
+ }
+
+ private DorisSourceSplit deserialize(byte[] serialized) throws IOException
{
+ final DataInputDeserializer in = new DataInputDeserializer(serialized);
+ final String database = in.readUTF();
+ final String table = in.readUTF();
+ final String beAddress = in.readUTF();
+ Long[] vals = readLongArray(in);
+ final Set<Long> tabletIds = new HashSet<>(Arrays.asList(vals));
+ final String queryPlan = in.readUTF();
+ PartitionDefinition partDef = new PartitionDefinition(database, table,
beAddress, tabletIds, queryPlan);
+ return new DorisSourceSplit(partDef);
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisDeserializationSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitState.java
similarity index 67%
copy from
flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisDeserializationSchema.java
copy to
flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitState.java
index 2aaec99..6369a08 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisDeserializationSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitState.java
@@ -14,11 +14,20 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.flink.deserialization;
+package org.apache.doris.flink.source.split;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+/**
+ * State of the reader, essentially a mutable version of the {@link
DorisSourceSplit}.
+ **/
+public class DorisSourceSplitState {
-import java.io.Serializable;
+ private final DorisSourceSplit split;
-public interface DorisDeserializationSchema<T> extends Serializable,
ResultTypeQueryable<T> {
+ public DorisSourceSplitState(DorisSourceSplit split) {
+ this.split = split;
+ }
+
+ public DorisSourceSplit toDorisSourceSplit() {
+ return split;
+ }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java
new file mode 100644
index 0000000..6f02446
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.split;
+
+import org.apache.doris.flink.datastream.ScalaValueReader;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * An implementation of {@link RecordsWithSplitIds}.
+ * This is essentially a slim wrapper around the {@link ScalaValueReader} that
only adds
+ * information about the current split, or finished splits
+ */
+public class DorisSplitRecords implements RecordsWithSplitIds<List> {
+
+ private final Set<String> finishedSplits;
+ private final ScalaValueReader scalaValueReader;
+ private String splitId;
+
+ public DorisSplitRecords(String splitId,
+ ScalaValueReader scalaValueReader,
+ Set<String> finishedSplits) {
+ this.splitId = splitId;
+ this.scalaValueReader = scalaValueReader;
+ this.finishedSplits = finishedSplits;
+ }
+
+ public static DorisSplitRecords forRecords(
+ final String splitId, final ScalaValueReader valueReader) {
+ return new DorisSplitRecords(splitId, valueReader,
Collections.emptySet());
+ }
+
+ public static DorisSplitRecords finishedSplit(final String splitId) {
+ return new DorisSplitRecords(null, null,
Collections.singleton(splitId));
+ }
+
+ @Nullable
+ @Override
+ public String nextSplit() {
+ // move the split one (from current value to null)
+ final String nextSplit = this.splitId;
+ this.splitId = null;
+ if (scalaValueReader == null || !scalaValueReader.hasNext()) {
+ return null;
+ }
+ return nextSplit;
+ }
+
+ @Nullable
+ @Override
+ public List nextRecordFromSplit() {
+ if (scalaValueReader != null && scalaValueReader.hasNext()) {
+ List next = scalaValueReader.next();
+ return next;
+ }
+ return null;
+ }
+
+ @Override
+ public Set<String> finishedSplits() {
+ return finishedSplits;
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 76a7bbc..7d6455e 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -19,15 +19,12 @@ package org.apache.doris.flink.table;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
@@ -37,7 +34,6 @@ import org.apache.flink.table.utils.TableSchemaUtils;
import java.time.Duration;
import java.util.HashSet;
import java.util.Map;
-import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@@ -159,6 +155,12 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
.defaultValue(true)
.withDescription("whether to enable the delete function");
+ private static final ConfigOption<Boolean> SOURCE_USE_OLD_API =
ConfigOptions
+ .key("source.use-old-api")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to read data using the new interface
defined according to the FLIP-27 specification,default false");
+
@Override
public String factoryIdentifier() {
return "doris"; // used for matching to `connector = '...'`
@@ -199,6 +201,8 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
options.add(SINK_LABEL_PREFIX);
options.add(SINK_BUFFER_SIZE);
options.add(SINK_BUFFER_COUNT);
+
+ options.add(SOURCE_USE_OLD_API);
return options;
}
@@ -244,7 +248,8 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
.setRequestConnectTimeoutMs(readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS))
.setRequestReadTimeoutMs(readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS))
.setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES))
- .setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE));
+ .setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE))
+ .setUseOldApi(readableConfig.get(SOURCE_USE_OLD_API));
return builder.build();
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index 689aa47..0af8ad5 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -19,9 +19,12 @@ package org.apache.doris.flink.table;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.deserialization.RowDataDeserializationSchema;
import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.rest.PartitionDefinition;
import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.source.DorisSource;
+import org.apache.doris.flink.source.DorisSourceBuilder;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.TableSchema;
@@ -30,6 +33,7 @@ import
org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceProvider;
import
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
@@ -50,12 +54,14 @@ import java.util.List;
*/
public final class DorisDynamicTableSource implements ScanTableSource,
LookupTableSource {
+ private static final Logger LOG =
LoggerFactory.getLogger(DorisDynamicTableSource.class);
private final DorisOptions options;
private final DorisReadOptions readOptions;
private TableSchema physicalSchema;
- private static final Logger LOG =
LoggerFactory.getLogger(DorisRowDataInputFormat.class);
- public DorisDynamicTableSource(DorisOptions options, DorisReadOptions
readOptions, TableSchema physicalSchema) {
+ public DorisDynamicTableSource(DorisOptions options,
+ DorisReadOptions readOptions,
+ TableSchema physicalSchema) {
this.options = options;
this.readOptions = readOptions;
this.physicalSchema = physicalSchema;
@@ -70,21 +76,31 @@ public final class DorisDynamicTableSource implements
ScanTableSource, LookupTab
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext
runtimeProviderContext) {
- List<PartitionDefinition> dorisPartitions;
- try {
- dorisPartitions = RestService.findPartitions(options, readOptions,
LOG);
- } catch (DorisException e) {
- throw new RuntimeException("Failed fetch doris partitions");
+ if (readOptions.getUseOldApi()) {
+ List<PartitionDefinition> dorisPartitions;
+ try {
+ dorisPartitions = RestService.findPartitions(options,
readOptions, LOG);
+ } catch (DorisException e) {
+ throw new RuntimeException("Failed fetch doris partitions");
+ }
+ DorisRowDataInputFormat.Builder builder =
DorisRowDataInputFormat.builder()
+ .setFenodes(options.getFenodes())
+ .setUsername(options.getUsername())
+ .setPassword(options.getPassword())
+ .setTableIdentifier(options.getTableIdentifier())
+ .setPartitions(dorisPartitions)
+ .setReadOptions(readOptions)
+ .setRowType((RowType)
physicalSchema.toRowDataType().getLogicalType());
+ return InputFormatProvider.of(builder.build());
+ } else {
+ //Read data using the interface of the FLIP-27 specification
+ DorisSource<RowData> build = DorisSourceBuilder.<RowData>builder()
+ .setDorisReadOptions(readOptions)
+ .setDorisOptions(options)
+ .setDeserializer(new
RowDataDeserializationSchema((RowType)
physicalSchema.toRowDataType().getLogicalType()))
+ .build();
+ return SourceProvider.of(build);
}
- DorisRowDataInputFormat.Builder builder =
DorisRowDataInputFormat.builder()
- .setFenodes(options.getFenodes())
- .setUsername(options.getUsername())
- .setPassword(options.getPassword())
- .setTableIdentifier(options.getTableIdentifier())
- .setPartitions(dorisPartitions)
- .setReadOptions(readOptions)
- .setRowType((RowType)
physicalSchema.toRowDataType().getLogicalType());
- return InputFormatProvider.of(builder.build());
}
@Override
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
index be1e13d..fbcaeea 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
@@ -19,6 +19,7 @@ package org.apache.doris.flink.table;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.datastream.ScalaValueReader;
+import org.apache.doris.flink.deserialization.converter.DorisRowConverter;
import org.apache.doris.flink.rest.PartitionDefinition;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
@@ -30,12 +31,13 @@ import
org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.math.BigDecimal;
@@ -43,9 +45,6 @@ import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.List;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* InputFormat for {@link DorisDynamicTableSource}.
*/
@@ -63,13 +62,16 @@ public class DorisRowDataInputFormat extends
RichInputFormat<RowData, DorisTable
private ScalaValueReader scalaValueReader;
private transient boolean hasNext;
- private RowType rowType;
+ private final DorisRowConverter rowConverter;
- public DorisRowDataInputFormat(DorisOptions options,
List<PartitionDefinition> dorisPartitions, DorisReadOptions readOptions,
RowType rowType) {
+ public DorisRowDataInputFormat(DorisOptions options,
+ List<PartitionDefinition> dorisPartitions,
+ DorisReadOptions readOptions,
+ RowType rowType) {
this.options = options;
this.dorisPartitions = dorisPartitions;
this.readOptions = readOptions;
- this.rowType = rowType;
+ this.rowConverter = new DorisRowConverter(rowType);
}
@Override
@@ -146,11 +148,7 @@ public class DorisRowDataInputFormat extends
RichInputFormat<RowData, DorisTable
return null;
}
List next = (List) scalaValueReader.next();
- GenericRowData genericRowData = new
GenericRowData(rowType.getFieldCount());
- for (int i = 0; i < next.size() && i < rowType.getFieldCount(); i++) {
- Object value = deserialize(rowType.getTypeAt(i), next.get(i));
- genericRowData.setField(i, value);
- }
+ RowData genericRowData = rowConverter.convert(next);
//update hasNext after we've read the record
hasNext = scalaValueReader.hasNext();
return genericRowData;
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchemaTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchemaTest.java
new file mode 100644
index 0000000..cf22009
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchemaTest.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.deserialization;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.doris.flink.utils.FactoryMocks.PHYSICAL_TYPE;
+import static org.junit.Assert.assertEquals;
+
+public class RowDataDeserializationSchemaTest {
+
+ @Test
+ public void deserializeTest() throws Exception {
+ List<String> records = Arrays.asList("flink","doris");
+ SimpleCollector collector = new SimpleCollector();
+ RowDataDeserializationSchema deserializationSchema = new
RowDataDeserializationSchema(PHYSICAL_TYPE);
+ for(String record : records){
+ deserializationSchema.deserialize(Arrays.asList(record),collector);
+ }
+
+ List<String> expected =
+ Arrays.asList(
+ "+I(flink)",
+ "+I(doris)");
+
+ List<String> actual =
+
collector.list.stream().map(Object::toString).collect(Collectors.toList());
+ assertEquals(expected, actual);
+ }
+
+ private static class SimpleCollector implements Collector<RowData> {
+ private List<RowData> list = new ArrayList<>();
+
+ @Override
+ public void collect(RowData record) {
+ list.add(record);
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
new file mode 100644
index 0000000..3489399
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.deserialization.convert;
+
+import org.apache.doris.flink.deserialization.converter.DorisRowConverter;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+
+public class DorisRowConverterTest implements Serializable {
+
+ @Test
+ public void testConvert(){
+ ResolvedSchema SCHEMA =
+ ResolvedSchema.of(
+ Column.physical("f1", DataTypes.NULL()),
+ Column.physical("f2", DataTypes.BOOLEAN()),
+ Column.physical("f3", DataTypes.FLOAT()),
+ Column.physical("f4", DataTypes.DOUBLE()),
+ Column.physical("f5",
DataTypes.INTERVAL(DataTypes.YEAR())),
+ Column.physical("f6",
DataTypes.INTERVAL(DataTypes.DAY())),
+ Column.physical("f7", DataTypes.TINYINT()),
+ Column.physical("f8", DataTypes.SMALLINT()),
+ Column.physical("f9", DataTypes.INT()),
+ Column.physical("f10", DataTypes.BIGINT()),
+ Column.physical("f11", DataTypes.DECIMAL(10,2)),
+ Column.physical("f12",
DataTypes.TIMESTAMP_WITH_TIME_ZONE()),
+ Column.physical("f13",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()),
+ Column.physical("f14", DataTypes.DATE()),
+ Column.physical("f15", DataTypes.CHAR(1)),
+ Column.physical("f16", DataTypes.VARCHAR(256)));
+
+ DorisRowConverter converter = new DorisRowConverter((RowType)
SCHEMA.toPhysicalRowDataType().getLogicalType());
+
+ List record = Arrays.asList(null,"true",1.2,1.2345,24,10,1,32,64,128,
BigDecimal.valueOf(10.123),"2021-01-01 08:00:00","2021-01-01
08:00:00","2021-01-01","a","doris");
+ GenericRowData rowData = converter.convert(record);
+
Assert.assertEquals("+I(null,true,1.2,1.2345,24,10,1,32,64,128,10.12,2021-01-01
08:00:00,2021-01-01 08:00:00,2021-01-01,a,doris)",rowData.toString());
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/OptionUtils.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/OptionUtils.java
index 5984d91..8e08071 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/OptionUtils.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/OptionUtils.java
@@ -19,7 +19,10 @@ package org.apache.doris.flink.sink;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.rest.PartitionDefinition;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.Properties;
/**
@@ -35,19 +38,20 @@ public class OptionUtils {
DorisExecutionOptions.Builder builder =
DorisExecutionOptions.builder();
builder.setLabelPrefix("doris")
.setStreamLoadProp(properties)
- .setBufferSize(8*1024)
+ .setBufferSize(8 * 1024)
.setBufferCount(3)
.setDeletable(true)
.setCheckInterval(100)
.setMaxRetries(2);
return builder.build();
}
+
public static DorisExecutionOptions buildExecutionOptional(Properties
properties) {
DorisExecutionOptions.Builder builder =
DorisExecutionOptions.builder();
builder.setLabelPrefix("doris")
.setStreamLoadProp(properties)
- .setBufferSize(8*1024)
+ .setBufferSize(8 * 1024)
.setBufferCount(3)
.setDeletable(true)
.setCheckInterval(100)
@@ -56,6 +60,10 @@ public class OptionUtils {
}
public static DorisReadOptions buildDorisReadOptions() {
+ return dorisReadOptionsBuilder().build();
+ }
+
+ public static DorisReadOptions.Builder dorisReadOptionsBuilder() {
DorisReadOptions.Builder builder = DorisReadOptions.builder();
builder.setDeserializeArrowAsync(false)
.setDeserializeQueueSize(64)
@@ -66,15 +74,20 @@ public class OptionUtils {
.setRequestReadTimeoutMs(10000)
.setRequestRetries(3)
.setRequestTabletSize(1024 * 1024);
- return builder.build();
+ return builder;
}
public static DorisOptions buildDorisOptions() {
DorisOptions.Builder builder = DorisOptions.builder();
- builder.setFenodes("local:8040")
- .setTableIdentifier("db_test.table_test")
- .setUsername("u_test")
- .setPassword("p_test");
+ builder.setFenodes("127.0.0.1:8030")
+ .setTableIdentifier("db.table")
+ .setUsername("root")
+ .setPassword("");
return builder.build();
}
+
+ public static PartitionDefinition buildPartitionDef() {
+ HashSet<Long> tabletIds = new HashSet<>(Arrays.asList(100L));
+ return new PartitionDefinition("db", "table", "127.0.0.1:9060",
tabletIds, "");
+ }
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java
new file mode 100644
index 0000000..aa5eb59
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source;
+
+import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
+import org.apache.doris.flink.sink.OptionUtils;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * Example Tests for the {@link DorisSource}.
+ **/
+public class DorisSourceExampleTest {
+
+ @Test
+ public void testBoundedDorisSource() throws Exception {
+ DorisSource<List<?>> dorisSource =
DorisSourceBuilder.<List<?>>builder()
+ .setDorisOptions(OptionUtils.buildDorisOptions())
+ .setDorisReadOptions(OptionUtils.buildDorisReadOptions())
+ .setDeserializer(new SimpleListDeserializationSchema())
+ .build();
+
+
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris
Source")
+ .addSink(new PrintSinkFunction<>());
+ env.execute("Flink doris source test");
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializerTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializerTest.java
new file mode 100644
index 0000000..36cb4f9
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializerTest.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.enumerator;
+
+import org.apache.doris.flink.sink.OptionUtils;
+import org.apache.doris.flink.source.split.DorisSourceSplit;
+import org.apache.doris.flink.source.split.DorisSourceSplitSerializer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+/**
+ * Unit tests for the {@link PendingSplitsCheckpointSerializer}.
+ */
+public class PendingSplitsCheckpointSerializerTest {
+
+ private static void assertCheckpointsEqual(
+ final PendingSplitsCheckpoint expected,
+ final PendingSplitsCheckpoint actual) {
+ Assert.assertEquals(expected.getSplits(), actual.getSplits());
+ }
+
+ @Test
+ public void serializeSplit() throws Exception {
+ final DorisSourceSplit split =
+ new DorisSourceSplit(OptionUtils.buildPartitionDef());
+ PendingSplitsCheckpoint checkpoint = new
PendingSplitsCheckpoint(Arrays.asList(split));
+
+ final PendingSplitsCheckpointSerializer splitSerializer = new
PendingSplitsCheckpointSerializer(DorisSourceSplitSerializer.INSTANCE);
+ byte[] serialized = splitSerializer.serialize(checkpoint);
+ PendingSplitsCheckpoint deserialize =
splitSerializer.deserialize(splitSerializer.getVersion(), serialized);
+
+ assertCheckpointsEqual(checkpoint, deserialize);
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java
new file mode 100644
index 0000000..a44b96d
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.reader;
+
+import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
+import org.apache.doris.flink.sink.OptionUtils;
+import org.apache.doris.flink.source.split.DorisSourceSplit;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit tests for the {@link DorisSourceReader}.
+ */
+public class DorisSourceReaderTest {
+
+ private static DorisSourceReader createReader(TestingReaderContext
context) {
+ return new DorisSourceReader<>(
+ OptionUtils.buildDorisOptions(),
+ OptionUtils.buildDorisReadOptions(),
+ new DorisRecordEmitter<>(new
SimpleListDeserializationSchema()),
+ context,
+ context.getConfiguration()
+ );
+ }
+
+ private static DorisSourceSplit createTestDorisSplit() throws IOException {
+ return new DorisSourceSplit(OptionUtils.buildPartitionDef());
+ }
+
+ @Test
+ public void testRequestSplitWhenNoSplitRestored() throws Exception {
+ final TestingReaderContext context = new TestingReaderContext();
+ final DorisSourceReader reader = createReader(context);
+
+ reader.start();
+ reader.close();
+ assertEquals(1, context.getNumSplitRequests());
+ }
+
+ @Test
+ public void testNoSplitRequestWhenSplitRestored() throws Exception {
+ final TestingReaderContext context = new TestingReaderContext();
+ final DorisSourceReader reader = createReader(context);
+
+ reader.addSplits(Collections.singletonList(createTestDorisSplit()));
+ reader.start();
+ reader.close();
+
+ assertEquals(0, context.getNumSplitRequests());
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/TestingReaderContext.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/TestingReaderContext.java
new file mode 100644
index 0000000..253f2ea
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/TestingReaderContext.java
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.reader;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A testing implementation of the {@link SourceReaderContext}.
+ */
+public class TestingReaderContext implements SourceReaderContext {
+
+ private final SourceReaderMetricGroup metrics;
+
+ private final Configuration config;
+
+ private final ArrayList<SourceEvent> sentEvents = new ArrayList<>();
+
+ private int numSplitRequests;
+
+ public TestingReaderContext() {
+ this(new Configuration(),
UnregisteredMetricsGroup.createSourceReaderMetricGroup());
+ }
+
+ public TestingReaderContext(Configuration config, SourceReaderMetricGroup
metricGroup) {
+ this.config = config;
+ this.metrics = metricGroup;
+ }
+
+ @Override
+ public SourceReaderMetricGroup metricGroup() {
+ return metrics;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return config;
+ }
+
+ @Override
+ public String getLocalHostName() {
+ return "localhost";
+ }
+
+ @Override
+ public int getIndexOfSubtask() {
+ return 0;
+ }
+
+ @Override
+ public void sendSplitRequest() {
+ numSplitRequests++;
+ }
+
+ @Override
+ public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {
+ sentEvents.add(sourceEvent);
+ }
+
+ @Override
+ public UserCodeClassLoader getUserCodeClassLoader() {
+ return SimpleUserCodeClassLoader.create(getClass().getClassLoader());
+ }
+
+ public int getNumSplitRequests() {
+ return numSplitRequests;
+ }
+
+ public List<SourceEvent> getSentEvents() {
+ return new ArrayList<>(sentEvents);
+ }
+
+ public void clearSentEvents() {
+ sentEvents.clear();
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializerTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializerTest.java
new file mode 100644
index 0000000..b116539
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializerTest.java
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.source.split;
+
+import org.apache.doris.flink.sink.OptionUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit tests for the {@link DorisSourceSplitSerializer}.
+ */
+public class DorisSourceSplitSerializerTest {
+
+ @Test
+ public void serializeSplit() throws Exception {
+ final DorisSourceSplit split =
+ new DorisSourceSplit(OptionUtils.buildPartitionDef());
+
+ DorisSourceSplit deSerialized = serializeAndDeserializeSplit(split);
+ assertEquals(split, deSerialized);
+ }
+
+ private DorisSourceSplit serializeAndDeserializeSplit(DorisSourceSplit
split) throws Exception {
+ final DorisSourceSplitSerializer splitSerializer = new
DorisSourceSplitSerializer();
+ byte[] serialized = splitSerializer.serialize(split);
+ return splitSerializer.deserialize(splitSerializer.getVersion(),
serialized);
+ }
+
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSplitRecordsTest.java
similarity index 62%
copy from
flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
copy to
flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSplitRecordsTest.java
index d9ec6e5..4873cb2 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSplitRecordsTest.java
@@ -14,20 +14,25 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.flink.deserialization;
+package org.apache.doris.flink.source.split;
+import org.junit.Test;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import java.util.Collections;
-import java.util.List;
+import static org.junit.Assert.assertEquals;
+/**
+ * Unit tests for the {@link DorisSplitRecords} class.
+ */
+public class DorisSplitRecordsTest {
-public class SimpleListDeserializationSchema implements
DorisDeserializationSchema<List<?>> {
+ @Test
+ public void testEmptySplits() {
+ final String split = "empty";
+ final DorisSplitRecords records =
DorisSplitRecords.finishedSplit(split);
- @Override
- public TypeInformation<List<?>> getProducedType() {
- return TypeInformation.of(new TypeHint<List<?>>() {
- });
+ assertEquals(Collections.singleton(split), records.finishedSplits());
}
+
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableSourceTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableSourceTest.java
new file mode 100644
index 0000000..533d56b
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableSourceTest.java
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.table;
+
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.sink.OptionUtils;
+import org.apache.doris.flink.source.DorisSource;
+import org.apache.doris.flink.source.enumerator.PendingSplitsCheckpoint;
+import org.apache.doris.flink.source.split.DorisSourceSplit;
+import org.apache.doris.flink.utils.FactoryMocks;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.source.InputFormatProvider;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceProvider;
+import org.apache.flink.table.data.RowData;
+import
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class DorisDynamicTableSourceTest {
+
+ @Test
+ public void testDorisUseNewApi() {
+ DorisReadOptions.Builder builder =
OptionUtils.dorisReadOptionsBuilder();
+ builder.setUseOldApi(false);
+ final DorisDynamicTableSource actualDorisSource = new
DorisDynamicTableSource(OptionUtils.buildDorisOptions(), builder.build(),
TableSchema.fromResolvedSchema(FactoryMocks.SCHEMA));
+ ScanTableSource.ScanRuntimeProvider provider =
+
actualDorisSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+ assertDorisSource(provider);
+ }
+
+ @Test
+ public void testDorisUseNewApiDefault() {
+ final DorisDynamicTableSource actualDorisSource = new
DorisDynamicTableSource(OptionUtils.buildDorisOptions(),
OptionUtils.buildDorisReadOptions(),
TableSchema.fromResolvedSchema(FactoryMocks.SCHEMA));
+ ScanTableSource.ScanRuntimeProvider provider =
+
actualDorisSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+ assertDorisSource(provider);
+ }
+
+ @Test
+ public void testDorisUseOldApi() {
+ DorisReadOptions.Builder builder =
OptionUtils.dorisReadOptionsBuilder();
+ builder.setUseOldApi(true);
+ final DorisDynamicTableSource actualDorisSource = new
DorisDynamicTableSource(OptionUtils.buildDorisOptions(), builder.build(),
TableSchema.fromResolvedSchema(FactoryMocks.SCHEMA));
+ ScanTableSource.ScanRuntimeProvider provider =
+
actualDorisSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+ assertDorisInputFormat(provider);
+ }
+
+
+ private void assertDorisInputFormat(ScanTableSource.ScanRuntimeProvider
provider) {
+ assertThat(provider, instanceOf(InputFormatProvider.class));
+ final InputFormatProvider inputFormatProvider = (InputFormatProvider)
provider;
+
+ InputFormat<RowData, DorisTableInputSplit> inputFormat =
(InputFormat<RowData, DorisTableInputSplit>)
inputFormatProvider.createInputFormat();
+ assertThat(inputFormat, instanceOf(DorisRowDataInputFormat.class));
+ }
+
+
+ private void assertDorisSource(ScanTableSource.ScanRuntimeProvider
provider) {
+ assertThat(provider, instanceOf(SourceProvider.class));
+ final SourceProvider sourceProvider = (SourceProvider) provider;
+
+ Source<RowData, DorisSourceSplit, PendingSplitsCheckpoint> source =
+ (Source<RowData, DorisSourceSplit, PendingSplitsCheckpoint>)
sourceProvider.createSource();
+ assertThat(source, instanceOf(DorisSource.class));
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/FactoryMocks.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/FactoryMocks.java
new file mode 100644
index 0000000..28bbd44
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/FactoryMocks.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.utils;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+/** Utilities for testing instances usually created by {@link FactoryUtil}. */
+public final class FactoryMocks {
+
+ public static final ResolvedSchema SCHEMA =
+ ResolvedSchema.of(
+ Column.physical("a", DataTypes.STRING()),
+ Column.physical("b", DataTypes.INT()),
+ Column.physical("c", DataTypes.BOOLEAN()));
+
+ public static final DataType PHYSICAL_DATA_TYPE =
SCHEMA.toPhysicalRowDataType();
+
+ public static final RowType PHYSICAL_TYPE = (RowType)
PHYSICAL_DATA_TYPE.getLogicalType();
+
+ public static final ObjectIdentifier IDENTIFIER =
+ ObjectIdentifier.of("default", "default", "t1");
+
+ private FactoryMocks() {
+ // no instantiation
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]