This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 00d50e91abe [HUDI-6293] Make HoodieFlinkCompactor's parallelism of
compact_task more reasonable (#8854)
00d50e91abe is described below
commit 00d50e91abe24aba31daa2fe2806de5414f03c77
Author: Dongsj <[email protected]>
AuthorDate: Thu Jun 1 11:38:27 2023 +0800
[HUDI-6293] Make HoodieFlinkCompactor's parallelism of compact_task more
reasonable (#8854)
Co-authored-by: dongsj <[email protected]>
---
.../java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index 63dfd26c4ac..e396897dc7e 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -274,10 +274,12 @@ public class HoodieFlinkCompactor {
List<HoodieInstant> instants =
compactionInstantTimes.stream().map(HoodieTimeline::getCompactionRequestedInstant).collect(Collectors.toList());
+ int totalOperations =
Math.toIntExact(compactionPlans.stream().mapToLong(pair ->
pair.getRight().getOperations().size()).sum());
+
// get compactionParallelism.
int compactionParallelism =
conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1
- ? Math.toIntExact(compactionPlans.stream().mapToLong(pair ->
pair.getRight().getOperations().size()).sum())
- : conf.getInteger(FlinkOptions.COMPACTION_TASKS);
+ ? totalOperations
+ : Math.min(conf.getInteger(FlinkOptions.COMPACTION_TASKS),
totalOperations);
LOG.info("Start to compaction for instant " + compactionInstantTimes);