This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 35c00daaf87 [HUDI-8074] Improve comaction operator shuffle  rebanlance 
(#11757)
35c00daaf87 is described below

commit 35c00daaf871a6c1b87d6a440832d60f9b26ee14
Author: Manu <[email protected]>
AuthorDate: Wed Aug 14 08:42:47 2024 +0800

    [HUDI-8074] Improve comaction operator shuffle  rebanlance (#11757)
---
 .../org/apache/hudi/sink/compact/CompactionPlanEvent.java  | 12 ++++++++++++
 .../apache/hudi/sink/compact/CompactionPlanOperator.java   | 14 +++++++++++++-
 .../main/java/org/apache/hudi/sink/utils/Pipelines.java    | 11 ++++++++---
 3 files changed, 33 insertions(+), 4 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanEvent.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanEvent.java
index 4fd09d477f0..6ed31fa2352 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanEvent.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanEvent.java
@@ -32,6 +32,8 @@ public class CompactionPlanEvent implements Serializable {
 
   private CompactionOperation operation;
 
+  private int index;
+
   public CompactionPlanEvent() {
   }
 
@@ -40,6 +42,12 @@ public class CompactionPlanEvent implements Serializable {
     this.operation = operation;
   }
 
+  public CompactionPlanEvent(String instantTime, CompactionOperation 
operation, int index) {
+    this.compactionInstantTime = instantTime;
+    this.operation = operation;
+    this.index = index;
+  }
+
   public void setCompactionInstantTime(String compactionInstantTime) {
     this.compactionInstantTime = compactionInstantTime;
   }
@@ -55,4 +63,8 @@ public class CompactionPlanEvent implements Serializable {
   public CompactionOperation getOperation() {
     return operation;
   }
+
+  public int getIndex() {
+    return index;
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
index 3cbd70a5f03..91da3a48f64 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
@@ -42,7 +42,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static java.util.stream.Collectors.toList;
 
@@ -146,8 +148,18 @@ public class CompactionPlanOperator extends 
AbstractStreamOperator<CompactionPla
       WriteMarkersFactory
           .get(table.getConfig().getMarkersType(), table, 
compactionInstantTime)
           .deleteMarkerDir(table.getContext(), 
table.getConfig().getMarkersDeleteParallelism());
+      Map<String, Integer> fileIdIndexMap = new HashMap<>();
+      int index = 0;
       for (CompactionOperation operation : operations) {
-        output.collect(new StreamRecord<>(new 
CompactionPlanEvent(compactionInstantTime, operation)));
+        int operationIndex;
+        if (fileIdIndexMap.containsKey(operation.getFileId())) {
+          operationIndex = fileIdIndexMap.get(operation.getFileId());
+        } else {
+          operationIndex = index;
+          fileIdIndexMap.put(operation.getFileId(), operationIndex);
+          index++;
+        }
+        output.collect(new StreamRecord<>(new 
CompactionPlanEvent(compactionInstantTime, operation, operationIndex)));
       }
     }
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index b3acd4cfa11..2c0779a8f7b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -415,9 +415,7 @@ public class Pipelines {
             new CompactionPlanOperator(conf))
         .setParallelism(1) // plan generate must be singleton
         .setMaxParallelism(1)
-        // make the distribution strategy deterministic to avoid concurrent 
modifications
-        // on the same bucket files
-        .keyBy(plan -> plan.getOperation().getFileGroupId().getFileId())
+        .partitionCustom(new IndexPartitioner(), CompactionPlanEvent::getIndex)
         .transform("compact_task",
             TypeInformation.of(CompactionCommitEvent.class),
             new CompactOperator(conf))
@@ -512,4 +510,11 @@ public class Pipelines {
     private static final long serialVersionUID = 1L;
     public static DummySink INSTANCE = new DummySink();
   }
+
+  public static class IndexPartitioner implements Partitioner<Integer> {
+    @Override
+    public int partition(Integer key, int numPartitions) {
+      return key % numPartitions;
+    }
+  }
 }

Reply via email to