This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 549e849 [improvement](flink-connector) DataSourceFunction read doris
supports parallel (#7232)
549e849 is described below
commit 549e84940091e306b6be59665bcaa746e2197caa
Author: wudi <[email protected]>
AuthorDate: Wed Dec 15 16:21:29 2021 +0800
[improvement](flink-connector) DataSourceFunction read doris supports
parallel (#7232)
The previous DataSourceFunction inherited from RichSourceFunction.
As a result, no matter how much the parallelism of flink is set, the
parallelism of DataSourceFunction is only 1.
Now modify it to RichParallelSourceFunction.
And when flink has multiple degrees of parallelism, assign the doris data
to each parallelism.
For example, read dorisPartitions.size = 10, flink.parallelism = 4
The task is split as follows:
task0: dorisPartitions[0],[4],[8]
task1: dorisPartitions[1],[5],[9]
task2: dorisPartitions[2],[6]
task3: dorisPartitions[3],[7]
---
.../flink/datastream/DorisSourceFunction.java | 32 +++++++++++++++++++---
.../org/apache/doris/flink/rest/RestService.java | 4 +--
.../doris/flink/table/DorisDynamicTableSource.java | 2 +-
.../org/apache/doris/flink/DorisSourceExample.java | 2 +-
4 files changed, 32 insertions(+), 8 deletions(-)
diff --git
a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
index 85f5f6b..edde953 100644
---
a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
+++
b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
@@ -20,12 +20,14 @@ import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.cfg.DorisStreamOptions;
import org.apache.doris.flink.deserialization.DorisDeserializationSchema;
+import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.rest.PartitionDefinition;
import org.apache.doris.flink.rest.RestService;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +38,7 @@ import java.util.List;
* DorisSource
**/
-public class DorisSourceFunction extends RichSourceFunction<List<?>>
implements ResultTypeQueryable<List<?>> {
+public class DorisSourceFunction extends RichParallelSourceFunction<List<?>>
implements ResultTypeQueryable<List<?>> {
private static final Logger logger =
LoggerFactory.getLogger(DorisSourceFunction.class);
@@ -45,23 +47,45 @@ public class DorisSourceFunction extends
RichSourceFunction<List<?>> implements
private final DorisReadOptions readOptions;
private transient volatile boolean isRunning;
private List<PartitionDefinition> dorisPartitions;
+ private List<PartitionDefinition> taskDorisPartitions =
Lists.newArrayList();
public DorisSourceFunction(DorisStreamOptions streamOptions,
DorisDeserializationSchema<List<?>> deserializer) {
this.deserializer = deserializer;
this.options = streamOptions.getOptions();
this.readOptions = streamOptions.getReadOptions();
+ try {
+ this.dorisPartitions = RestService.findPartitions(options,
readOptions, logger);
+ logger.info("Doris partitions size {}", dorisPartitions.size());
+ } catch (DorisException e) {
+ throw new RuntimeException("Failed fetch doris partitions");
+ }
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.isRunning = true;
- this.dorisPartitions = RestService.findPartitions(options,
readOptions, logger);
+ assignTaskPartitions();
+ }
+
+ /**
+ * Assign patitions to each task.
+ */
+ private void assignTaskPartitions() {
+ int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
+ int totalTasks = getRuntimeContext().getNumberOfParallelSubtasks();
+
+ for (int i = 0; i < dorisPartitions.size(); i++) {
+ if (i % totalTasks == taskIndex) {
+ taskDorisPartitions.add(dorisPartitions.get(i));
+ }
+ }
+ logger.info("subtask {} process {} partitions ", taskIndex,
taskDorisPartitions.size());
}
@Override
public void run(SourceContext<List<?>> sourceContext) {
- for (PartitionDefinition partitions : dorisPartitions) {
+ for (PartitionDefinition partitions : taskDorisPartitions) {
try (ScalaValueReader scalaValueReader = new
ScalaValueReader(partitions, options, readOptions)) {
while (isRunning && scalaValueReader.hasNext()) {
List<?> next = scalaValueReader.next();
diff --git
a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index 1e6310c..82e01e0 100644
---
a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++
b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -444,11 +444,11 @@ public class RestService implements Serializable {
}
/**
- * find Doris RDD partitions from Doris FE.
+ * find Doris partitions from Doris FE.
*
* @param options configuration of request
* @param logger {@link Logger}
- * @return an list of Doris RDD partitions
+ * @return an list of Doris partitions
* @throws DorisException throw when find partition failed
*/
public static List<PartitionDefinition> findPartitions(DorisOptions
options, DorisReadOptions readOptions, Logger logger) throws DorisException {
diff --git
a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index 43d9e5f..0262677 100644
---
a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++
b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -72,7 +72,7 @@ public final class DorisDynamicTableSource implements
ScanTableSource, LookupTab
try {
dorisPartitions = RestService.findPartitions(options, readOptions,
LOG);
} catch (DorisException e) {
- throw new RuntimeException("can not fetch partitions");
+ throw new RuntimeException("Failed fetch doris partitions");
}
DorisRowDataInputFormat.Builder builder =
DorisRowDataInputFormat.builder()
.setFenodes(options.getFenodes())
diff --git
a/extension/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceExample.java
b/extension/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceExample.java
index eb1d819..35857dc 100644
---
a/extension/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceExample.java
+++
b/extension/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceExample.java
@@ -26,7 +26,7 @@ public class DorisSourceExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1); // source only supports parallelism of 1
+ env.setParallelism(1);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]