This is an automated email from the ASF dual-hosted git repository.
wombatukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 539a48bdb7e [MINOR] Refactored `Pipelines::bulkInsert` (#12396)
539a48bdb7e is described below
commit 539a48bdb7eb8ab377cc2a39d35822f2c8525f8c
Author: Geser Dugarov <[email protected]>
AuthorDate: Thu Dec 12 13:15:36 2024 +0700
[MINOR] Refactored `Pipelines::bulkInsert` (#12396)
---
.../java/org/apache/hudi/sink/utils/Pipelines.java | 70 ++++++++++------------
1 file changed, 32 insertions(+), 38 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 2c0779a8f7b..a74329ee696 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -68,13 +68,12 @@ import
org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* Utilities to generate all kinds of sub-pipelines.
@@ -110,8 +109,12 @@ public class Pipelines {
* @return the bulk insert data stream sink
*/
public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType
rowType, DataStream<RowData> dataStream) {
- WriteOperatorFactory<RowData> operatorFactory =
BulkInsertWriteOperator.getFactory(conf, rowType);
- if (OptionsResolver.isBucketIndexType(conf)) {
+ // we need same parallelism for all operators,
+ // which is equal to write tasks number, to avoid shuffles
+ final int PARALLELISM_VALUE = conf.getInteger(FlinkOptions.WRITE_TASKS);
+ final boolean isBucketIndexType = OptionsResolver.isBucketIndexType(conf);
+
+ if (isBucketIndexType) {
// TODO support bulk insert for consistent bucket index
if (OptionsResolver.isConsistentHashingBucketIndexType(conf)) {
throw new HoodieException(
@@ -129,61 +132,52 @@ public class Pipelines {
Map<String, String> bucketIdToFileId = new HashMap<>();
dataStream = dataStream.partitionCustom(partitioner,
keyGen::getHoodieKey)
.map(record ->
BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record,
indexKeys, numBuckets, needFixedFileIdSuffix), typeInfo)
- .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same
parallelism as write task to avoid shuffle
+ .setParallelism(PARALLELISM_VALUE);
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
SortOperatorGen sortOperatorGen =
BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId);
dataStream = dataStream.transform("file_sorter", typeInfo,
sortOperatorGen.createSortOperator(conf))
- .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); //
same parallelism as write task to avoid shuffle
+ .setParallelism(PARALLELISM_VALUE);
ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
}
- return dataStream
- .transform(opName("bucket_bulk_insert", conf),
TypeInformation.of(Object.class), operatorFactory)
- .uid(opUID("bucket_bulk_insert", conf))
- .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
- .addSink(DummySink.INSTANCE)
- .name("dummy");
- }
-
- final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf);
- if (partitionFields.length > 0) {
- RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
+ } else if (!FlinkOptions.isDefaultValueDefined(conf,
FlinkOptions.PARTITION_PATH_FIELD)) {
+ // if table is not partitioned then we don't need any shuffles,
+ // and could add main write operator only
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT)) {
-
// shuffle by partition keys
// use #partitionCustom instead of #keyBy to avoid duplicate sort
operations,
// see BatchExecutionUtils#applyBatchExecutionSettings for details.
Partitioner<String> partitioner = (key, channels) ->
KeyGroupRangeAssignment.assignKeyToParallelOperator(key,
-
KeyGroupRangeAssignment.computeDefaultMaxParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)),
channels);
+
KeyGroupRangeAssignment.computeDefaultMaxParallelism(PARALLELISM_VALUE),
channels);
+ RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
dataStream = dataStream.partitionCustom(partitioner,
rowDataKeyGen::getPartitionPath);
}
+
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
- String[] sortFields = partitionFields;
- String operatorName = "sorter:(partition_key)";
- if
(conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT_BY_RECORD_KEY)) {
- String[] recordKeyFields =
conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",");
- ArrayList<String> sortList = new
ArrayList<>(Arrays.asList(partitionFields));
- Collections.addAll(sortList, recordKeyFields);
- sortFields = sortList.toArray(new String[0]);
- operatorName = "sorter:(partition_key, record_key)";
- }
+ final boolean isNeededSortInput =
conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT_BY_RECORD_KEY);
+ final String[] partitionFields =
FilePathUtils.extractPartitionKeys(conf);
+ final String[] recordKeyFields =
conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",");
+
+ // if sort input by record key is needed then add record keys to
partition keys
+ String[] sortFields = isNeededSortInput
+ ? Stream.concat(Arrays.stream(partitionFields),
Arrays.stream(recordKeyFields)).toArray(String[]::new)
+ : partitionFields;
SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType,
sortFields);
- // sort by partition keys or (partition keys and record keys)
dataStream = dataStream
- .transform(operatorName,
- InternalTypeInfo.of(rowType),
- sortOperatorGen.createSortOperator(conf))
- .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
+ .transform(isNeededSortInput ? "sorter:(partition_key,
record_key)" : "sorter:(partition_key)",
+ InternalTypeInfo.of(rowType),
sortOperatorGen.createSortOperator(conf))
+ .setParallelism(PARALLELISM_VALUE);
ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
}
}
+
+ // main write operator with following dummy sink in the end
return dataStream
- .transform(opName("hoodie_bulk_insert_write", conf),
- TypeInformation.of(Object.class),
- operatorFactory)
- // follow the parallelism of upstream operators to avoid shuffle
- .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
+ .transform(opName(isBucketIndexType ? "bucket_bulk_insert" :
"hoodie_bulk_insert_write", conf),
+ TypeInformation.of(Object.class),
BulkInsertWriteOperator.getFactory(conf, rowType))
+ .uid(opUID("bucket_bulk_insert", conf))
+ .setParallelism(PARALLELISM_VALUE)
.addSink(DummySink.INSTANCE)
.name("dummy");
}