This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bc31372  [HUDI-2106] Fix flink batch compaction bug while user don't 
set compaction tasks (#3192)
bc31372 is described below

commit bc313727e3e89640edad85364022e057c9864ee9
Author: swuferhong <[email protected]>
AuthorDate: Tue Jul 6 09:10:37 2021 +0800

    [HUDI-2106] Fix flink batch compaction bug while user don't set compaction 
tasks (#3192)
---
 .../main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java | 4 ++--
 .../main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java  | 3 ++-
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
index 699f078..26ad824 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
@@ -86,8 +86,8 @@ public class FlinkCompactionConfig extends Configuration {
   @Parameter(names = {"--compaction-target-io"}, description = "Target IO per 
compaction (both read and write) for batching compaction, default 512000M.", 
required = false)
   public Long compactionTargetIo = 512000L;
 
-  @Parameter(names = {"--compaction-tasks"}, description = "Parallelism of 
tasks that do actual compaction, default is 10", required = false)
-  public Integer compactionTasks = 10;
+  @Parameter(names = {"--compaction-tasks"}, description = "Parallelism of 
tasks that do actual compaction, default is -1", required = false)
+  public Integer compactionTasks = -1;
 
   /**
    * Transforms a {@code HoodieFlinkCompaction.config} into {@code 
Configuration}.
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index 3e0a437..8ee6c11 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -112,7 +112,8 @@ public class HoodieFlinkCompactor {
     }
 
     // get compactionParallelism.
-    int compactionParallelism = 
Math.min(conf.getInteger(FlinkOptions.COMPACTION_TASKS), 
compactionPlan.getOperations().size());
+    int compactionParallelism = conf.getInteger(FlinkOptions.COMPACTION_TASKS) 
== -1
+            ? compactionPlan.getOperations().size() : 
conf.getInteger(FlinkOptions.COMPACTION_TASKS);
 
     env.addSource(new CompactionPlanSourceFunction(table, instant, 
compactionPlan, compactionInstantTime))
         .name("compaction_source")

Reply via email to