This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 35c00daaf87 [HUDI-8074] Improve comaction operator shuffle rebanlance
(#11757)
35c00daaf87 is described below
commit 35c00daaf871a6c1b87d6a440832d60f9b26ee14
Author: Manu <[email protected]>
AuthorDate: Wed Aug 14 08:42:47 2024 +0800
[HUDI-8074] Improve comaction operator shuffle rebanlance (#11757)
---
.../org/apache/hudi/sink/compact/CompactionPlanEvent.java | 12 ++++++++++++
.../apache/hudi/sink/compact/CompactionPlanOperator.java | 14 +++++++++++++-
.../main/java/org/apache/hudi/sink/utils/Pipelines.java | 11 ++++++++---
3 files changed, 33 insertions(+), 4 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanEvent.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanEvent.java
index 4fd09d477f0..6ed31fa2352 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanEvent.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanEvent.java
@@ -32,6 +32,8 @@ public class CompactionPlanEvent implements Serializable {
private CompactionOperation operation;
+ private int index;
+
public CompactionPlanEvent() {
}
@@ -40,6 +42,12 @@ public class CompactionPlanEvent implements Serializable {
this.operation = operation;
}
+ public CompactionPlanEvent(String instantTime, CompactionOperation
operation, int index) {
+ this.compactionInstantTime = instantTime;
+ this.operation = operation;
+ this.index = index;
+ }
+
public void setCompactionInstantTime(String compactionInstantTime) {
this.compactionInstantTime = compactionInstantTime;
}
@@ -55,4 +63,8 @@ public class CompactionPlanEvent implements Serializable {
public CompactionOperation getOperation() {
return operation;
}
+
+ public int getIndex() {
+ return index;
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
index 3cbd70a5f03..91da3a48f64 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
@@ -42,7 +42,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import static java.util.stream.Collectors.toList;
@@ -146,8 +148,18 @@ public class CompactionPlanOperator extends
AbstractStreamOperator<CompactionPla
WriteMarkersFactory
.get(table.getConfig().getMarkersType(), table,
compactionInstantTime)
.deleteMarkerDir(table.getContext(),
table.getConfig().getMarkersDeleteParallelism());
+ Map<String, Integer> fileIdIndexMap = new HashMap<>();
+ int index = 0;
for (CompactionOperation operation : operations) {
- output.collect(new StreamRecord<>(new
CompactionPlanEvent(compactionInstantTime, operation)));
+ int operationIndex;
+ if (fileIdIndexMap.containsKey(operation.getFileId())) {
+ operationIndex = fileIdIndexMap.get(operation.getFileId());
+ } else {
+ operationIndex = index;
+ fileIdIndexMap.put(operation.getFileId(), operationIndex);
+ index++;
+ }
+ output.collect(new StreamRecord<>(new
CompactionPlanEvent(compactionInstantTime, operation, operationIndex)));
}
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index b3acd4cfa11..2c0779a8f7b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -415,9 +415,7 @@ public class Pipelines {
new CompactionPlanOperator(conf))
.setParallelism(1) // plan generate must be singleton
.setMaxParallelism(1)
- // make the distribution strategy deterministic to avoid concurrent
modifications
- // on the same bucket files
- .keyBy(plan -> plan.getOperation().getFileGroupId().getFileId())
+ .partitionCustom(new IndexPartitioner(), CompactionPlanEvent::getIndex)
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new CompactOperator(conf))
@@ -512,4 +510,11 @@ public class Pipelines {
private static final long serialVersionUID = 1L;
public static DummySink INSTANCE = new DummySink();
}
+
+ public static class IndexPartitioner implements Partitioner<Integer> {
+ @Override
+ public int partition(Integer key, int numPartitions) {
+ return key % numPartitions;
+ }
+ }
}