This is an automated email from the ASF dual-hosted git repository.

wombatukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 539a48bdb7e [MINOR] Refactored `Pipelines::bulkInsert` (#12396)
539a48bdb7e is described below

commit 539a48bdb7eb8ab377cc2a39d35822f2c8525f8c
Author: Geser Dugarov <[email protected]>
AuthorDate: Thu Dec 12 13:15:36 2024 +0700

    [MINOR] Refactored `Pipelines::bulkInsert` (#12396)
---
 .../java/org/apache/hudi/sink/utils/Pipelines.java | 70 ++++++++++------------
 1 file changed, 32 insertions(+), 38 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 2c0779a8f7b..a74329ee696 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -68,13 +68,12 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Utilities to generate all kinds of sub-pipelines.
@@ -110,8 +109,12 @@ public class Pipelines {
    * @return the bulk insert data stream sink
    */
   public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType 
rowType, DataStream<RowData> dataStream) {
-    WriteOperatorFactory<RowData> operatorFactory = 
BulkInsertWriteOperator.getFactory(conf, rowType);
-    if (OptionsResolver.isBucketIndexType(conf)) {
+    // we need same parallelism for all operators,
+    // which is equal to write tasks number, to avoid shuffles
+    final int PARALLELISM_VALUE = conf.getInteger(FlinkOptions.WRITE_TASKS);
+    final boolean isBucketIndexType = OptionsResolver.isBucketIndexType(conf);
+
+    if (isBucketIndexType) {
       // TODO support bulk insert for consistent bucket index
       if (OptionsResolver.isConsistentHashingBucketIndexType(conf)) {
         throw new HoodieException(
@@ -129,61 +132,52 @@ public class Pipelines {
       Map<String, String> bucketIdToFileId = new HashMap<>();
       dataStream = dataStream.partitionCustom(partitioner, 
keyGen::getHoodieKey)
           .map(record -> 
BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, 
indexKeys, numBuckets, needFixedFileIdSuffix), typeInfo)
-          .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same 
parallelism as write task to avoid shuffle
+          .setParallelism(PARALLELISM_VALUE);
       if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
         SortOperatorGen sortOperatorGen = 
BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId);
         dataStream = dataStream.transform("file_sorter", typeInfo, 
sortOperatorGen.createSortOperator(conf))
-            .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // 
same parallelism as write task to avoid shuffle
+            .setParallelism(PARALLELISM_VALUE);
         ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
             conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
       }
-      return dataStream
-          .transform(opName("bucket_bulk_insert", conf), 
TypeInformation.of(Object.class), operatorFactory)
-          .uid(opUID("bucket_bulk_insert", conf))
-          .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
-          .addSink(DummySink.INSTANCE)
-          .name("dummy");
-    }
-
-    final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf);
-    if (partitionFields.length > 0) {
-      RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
+    } else if (!FlinkOptions.isDefaultValueDefined(conf, 
FlinkOptions.PARTITION_PATH_FIELD)) {
+      // if table is not partitioned then we don't need any shuffles,
+      // and could add main write operator only
       if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT)) {
-
         // shuffle by partition keys
         // use #partitionCustom instead of #keyBy to avoid duplicate sort 
operations,
         // see BatchExecutionUtils#applyBatchExecutionSettings for details.
         Partitioner<String> partitioner = (key, channels) -> 
KeyGroupRangeAssignment.assignKeyToParallelOperator(key,
-            
KeyGroupRangeAssignment.computeDefaultMaxParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)),
 channels);
+            
KeyGroupRangeAssignment.computeDefaultMaxParallelism(PARALLELISM_VALUE), 
channels);
+        RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
         dataStream = dataStream.partitionCustom(partitioner, 
rowDataKeyGen::getPartitionPath);
       }
+
       if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
-        String[] sortFields = partitionFields;
-        String operatorName = "sorter:(partition_key)";
-        if 
(conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT_BY_RECORD_KEY)) {
-          String[] recordKeyFields = 
conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",");
-          ArrayList<String> sortList = new 
ArrayList<>(Arrays.asList(partitionFields));
-          Collections.addAll(sortList, recordKeyFields);
-          sortFields = sortList.toArray(new String[0]);
-          operatorName = "sorter:(partition_key, record_key)";
-        }
+        final boolean isNeededSortInput = 
conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT_BY_RECORD_KEY);
+        final String[] partitionFields = 
FilePathUtils.extractPartitionKeys(conf);
+        final String[] recordKeyFields = 
conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",");
+
+        // if sort input by record key is needed then add record keys to 
partition keys
+        String[] sortFields = isNeededSortInput
+            ? Stream.concat(Arrays.stream(partitionFields), 
Arrays.stream(recordKeyFields)).toArray(String[]::new)
+            : partitionFields;
         SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, 
sortFields);
-        // sort by partition keys or (partition keys and record keys)
         dataStream = dataStream
-            .transform(operatorName,
-                InternalTypeInfo.of(rowType),
-                sortOperatorGen.createSortOperator(conf))
-            .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
+            .transform(isNeededSortInput ? "sorter:(partition_key, 
record_key)" : "sorter:(partition_key)",
+                InternalTypeInfo.of(rowType), 
sortOperatorGen.createSortOperator(conf))
+            .setParallelism(PARALLELISM_VALUE);
         ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
             conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
       }
     }
+
+    // main write operator with following dummy sink in the end
     return dataStream
-        .transform(opName("hoodie_bulk_insert_write", conf),
-            TypeInformation.of(Object.class),
-            operatorFactory)
-        // follow the parallelism of upstream operators to avoid shuffle
-        .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
+        .transform(opName(isBucketIndexType ? "bucket_bulk_insert" : 
"hoodie_bulk_insert_write", conf),
+            TypeInformation.of(Object.class), 
BulkInsertWriteOperator.getFactory(conf, rowType))
+        .uid(opUID("bucket_bulk_insert", conf))
+        .setParallelism(PARALLELISM_VALUE)
         .addSink(DummySink.INSTANCE)
         .name("dummy");
   }

Reply via email to