This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3bf9c5f  [HUDI-3728] Set the sort operator parallelism for flink 
bucket bulk insert (#5154)
3bf9c5f is described below

commit 3bf9c5ffe80e84e9b0ba34e115e1a7a417f343e8
Author: Danny Chan <[email protected]>
AuthorDate: Tue Mar 29 09:52:35 2022 +0800

    [HUDI-3728] Set the sort operator parallelism for flink bucket bulk insert 
(#5154)
---
 .../sink/bucket/BucketStreamWriteFunction.java     | 67 +++++++++-------------
 .../java/org/apache/hudi/sink/utils/Pipelines.java |  6 +-
 2 files changed, 30 insertions(+), 43 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
index 7e4cf68..1456e88 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
@@ -18,16 +18,12 @@
 
 package org.apache.hudi.sink.bucket;
 
-import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.index.bucket.BucketIdentifier;
 import org.apache.hudi.sink.StreamWriteFunction;
-import org.apache.hudi.table.HoodieFlinkTable;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -39,12 +35,9 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static java.util.stream.Collectors.toList;
-
 /**
  * A stream write function with bucket hash index.
  *
@@ -58,18 +51,14 @@ public class BucketStreamWriteFunction<I> extends 
StreamWriteFunction<I> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(BucketStreamWriteFunction.class);
 
-  private int maxParallelism;
-
   private int parallelism;
 
   private int bucketNum;
 
-  private transient HoodieFlinkTable<?> table;
-
   private String indexKeyFields;
 
   /**
-   * BucketID should be load in this task.
+   * BucketID should be loaded in this task.
    */
   private Set<Integer> bucketToLoad;
 
@@ -87,6 +76,11 @@ public class BucketStreamWriteFunction<I> extends 
StreamWriteFunction<I> {
   private Set<String> incBucketIndex;
 
   /**
+   * Returns whether this is an empty table.
+   */
+  private boolean isEmptyTable;
+
+  /**
    * Constructs a BucketStreamWriteFunction.
    *
    * @param config The config options
@@ -102,17 +96,15 @@ public class BucketStreamWriteFunction<I> extends 
StreamWriteFunction<I> {
     this.indexKeyFields = config.getString(FlinkOptions.INDEX_KEY_FIELD);
     this.taskID = getRuntimeContext().getIndexOfThisSubtask();
     this.parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
-    this.maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
-    this.bucketToLoad = new HashSet<>();
+    this.bucketToLoad = getBucketToLoad();
     this.bucketIndex = new HashMap<>();
     this.incBucketIndex = new HashSet<>();
-    getBucketToLoad();
+    this.isEmptyTable = 
!this.metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().isPresent();
   }
 
   @Override
   public void initializeState(FunctionInitializationContext context) throws 
Exception {
     super.initializeState(context);
-    this.table = this.writeClient.getHoodieTable();
   }
 
   @Override
@@ -129,19 +121,19 @@ public class BucketStreamWriteFunction<I> extends 
StreamWriteFunction<I> {
     final HoodieRecordLocation location;
 
     bootstrapIndexIfNeed(partition);
-    Map<Integer, String> bucketToFileIdMap = bucketIndex.get(partition);
+    Map<Integer, String> bucketToFileId = 
bucketIndex.computeIfAbsent(partition, p -> new HashMap<>());
     final int bucketNum = BucketIdentifier.getBucketId(hoodieKey, 
indexKeyFields, this.bucketNum);
-    final String partitionBucketId = 
BucketIdentifier.partitionBucketIdStr(hoodieKey.getPartitionPath(), bucketNum);
+    final String bucketId = partition + bucketNum;
 
-    if (incBucketIndex.contains(partitionBucketId)) {
-      location = new HoodieRecordLocation("I", 
bucketToFileIdMap.get(bucketNum));
-    } else if (bucketToFileIdMap.containsKey(bucketNum)) {
-      location = new HoodieRecordLocation("U", 
bucketToFileIdMap.get(bucketNum));
+    if (incBucketIndex.contains(bucketId)) {
+      location = new HoodieRecordLocation("I", bucketToFileId.get(bucketNum));
+    } else if (bucketToFileId.containsKey(bucketNum)) {
+      location = new HoodieRecordLocation("U", bucketToFileId.get(bucketNum));
     } else {
       String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum);
       location = new HoodieRecordLocation("I", newFileId);
-      bucketToFileIdMap.put(bucketNum,newFileId);
-      incBucketIndex.add(partitionBucketId);
+      bucketToFileId.put(bucketNum, newFileId);
+      incBucketIndex.add(bucketId);
     }
     record.unseal();
     record.setCurrentLocation(location);
@@ -153,39 +145,32 @@ public class BucketStreamWriteFunction<I> extends 
StreamWriteFunction<I> {
    * Bootstrap bucket info from existing file system,
    * bucketNum % totalParallelism == this taskID belongs to this task.
    */
-  private void getBucketToLoad() {
+  private Set<Integer> getBucketToLoad() {
+    Set<Integer> bucketToLoad = new HashSet<>();
     for (int i = 0; i < bucketNum; i++) {
       int partitionOfBucket = BucketIdentifier.mod(i, parallelism);
       if (partitionOfBucket == taskID) {
-        LOG.info(String.format("Bootstrapping index. Adding bucket %s , "
-                + "Current parallelism: %s , Max parallelism: %s , Current 
task id: %s",
-            i, parallelism, maxParallelism, taskID));
         bucketToLoad.add(i);
       }
     }
-    bucketToLoad.forEach(bucket -> LOG.info(String.format("bucketToLoad 
contains %s", bucket)));
+    LOG.info("Bucket number that belongs to task [{}/{}]: {}", taskID, 
parallelism, bucketToLoad);
+    return bucketToLoad;
   }
 
   /**
    * Get partition_bucket -> fileID mapping from the existing hudi table.
    * This is a required operation for each restart to avoid having duplicate 
file ids for one bucket.
    */
-  private void bootstrapIndexIfNeed(String partition) throws IOException {
-    if (bucketIndex.containsKey(partition)) {
-      return;
-    }
-    Option<HoodieInstant> latestCommitTime = 
table.getHoodieView().getTimeline().filterCompletedInstants().lastInstant();
-    if (!latestCommitTime.isPresent()) {
-      bucketIndex.put(partition, new HashMap<>());
+  private void bootstrapIndexIfNeed(String partition) {
+    if (isEmptyTable || bucketIndex.containsKey(partition)) {
       return;
     }
-    LOG.info(String.format("Loading Hoodie Table %s, with path %s", 
table.getMetaClient().getTableConfig().getTableName(),
-        table.getMetaClient().getBasePath() + "/" + partition));
+    LOG.info(String.format("Loading Hoodie Table %s, with path %s", 
this.metaClient.getTableConfig().getTableName(),
+        this.metaClient.getBasePath() + "/" + partition));
 
     // Load existing fileID belongs to this task
     Map<Integer, String> bucketToFileIDMap = new HashMap<>();
-    List<FileSlice> fileSlices = 
table.getHoodieView().getLatestFileSlices(partition).collect(toList());
-    for (FileSlice fileSlice : fileSlices) {
+    
this.writeClient.getHoodieTable().getHoodieView().getLatestFileSlices(partition).forEach(fileSlice
 -> {
       String fileID = fileSlice.getFileId();
       int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileID);
       if (bucketToLoad.contains(bucketNumber)) {
@@ -198,7 +183,7 @@ public class BucketStreamWriteFunction<I> extends 
StreamWriteFunction<I> {
           bucketToFileIDMap.put(bucketNumber, fileID);
         }
       }
-    }
+    });
     bucketIndex.put(partition, bucketToFileIDMap);
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 1992edd..9f0a817 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -98,10 +98,12 @@ public class Pipelines {
       InternalTypeInfo<RowData> typeInfo = 
InternalTypeInfo.of(rowTypeWithFileId);
       dataStream = dataStream.partitionCustom(partitioner, 
rowDataKeyGen::getRecordKey)
           .map(record -> 
BucketBulkInsertWriterHelper.rowWithFileId(rowDataKeyGen, record, 
indexKeyFields, bucketNum),
-              typeInfo);
+              typeInfo)
+          .setParallelism(dataStream.getParallelism()); // same parallelism as 
source
       if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
         SortOperatorGen sortOperatorGen = 
BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId);
-        dataStream = dataStream.transform("file_sorter", typeInfo, 
sortOperatorGen.createSortOperator());
+        dataStream = dataStream.transform("file_sorter", typeInfo, 
sortOperatorGen.createSortOperator())
+            .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // 
same parallelism as write task to avoid shuffle
         ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
             conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
       }

Reply via email to