This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new bc31372 [HUDI-2106] Fix flink batch compaction bug while user don't
set compaction tasks (#3192)
bc31372 is described below
commit bc313727e3e89640edad85364022e057c9864ee9
Author: swuferhong <[email protected]>
AuthorDate: Tue Jul 6 09:10:37 2021 +0800
[HUDI-2106] Fix flink batch compaction bug while user don't set compaction
tasks (#3192)
---
.../main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java | 4 ++--
.../main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java | 3 ++-
2 files changed, 4 insertions(+), 3 deletions(-)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
index 699f078..26ad824 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
@@ -86,8 +86,8 @@ public class FlinkCompactionConfig extends Configuration {
@Parameter(names = {"--compaction-target-io"}, description = "Target IO per
compaction (both read and write) for batching compaction, default 512000M.",
required = false)
public Long compactionTargetIo = 512000L;
- @Parameter(names = {"--compaction-tasks"}, description = "Parallelism of
tasks that do actual compaction, default is 10", required = false)
- public Integer compactionTasks = 10;
+ @Parameter(names = {"--compaction-tasks"}, description = "Parallelism of
tasks that do actual compaction, default is -1", required = false)
+ public Integer compactionTasks = -1;
/**
* Transforms a {@code HoodieFlinkCompaction.config} into {@code
Configuration}.
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index 3e0a437..8ee6c11 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -112,7 +112,8 @@ public class HoodieFlinkCompactor {
}
// get compactionParallelism.
- int compactionParallelism =
Math.min(conf.getInteger(FlinkOptions.COMPACTION_TASKS),
compactionPlan.getOperations().size());
+ int compactionParallelism = conf.getInteger(FlinkOptions.COMPACTION_TASKS)
== -1
+ ? compactionPlan.getOperations().size() :
conf.getInteger(FlinkOptions.COMPACTION_TASKS);
env.addSource(new CompactionPlanSourceFunction(table, instant,
compactionPlan, compactionInstantTime))
.name("compaction_source")