This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bd4b13926ed [HUDI-5926] Improve cleaner parallelism (#8171)
bd4b13926ed is described below

commit bd4b13926ed4a0cdccc347167ac24556200d064a
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue Mar 14 11:56:25 2023 -0700

    [HUDI-5926] Improve cleaner parallelism (#8171)
---
 .../main/java/org/apache/hudi/config/HoodieCleanConfig.java | 13 ++++++++++++-
 .../apache/hudi/table/action/clean/CleanActionExecutor.java |  4 ++--
 2 files changed, 14 insertions(+), 3 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java
index 30289e1acba..c1b66a371d1 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.table.action.clean.CleaningTriggerStrategy;
 
 import javax.annotation.concurrent.Immutable;
+
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
@@ -120,7 +121,17 @@ public class HoodieCleanConfig extends HoodieConfig {
   public static final ConfigProperty<String> CLEANER_PARALLELISM_VALUE = 
ConfigProperty
       .key("hoodie.cleaner.parallelism")
       .defaultValue("200")
-      .withDocumentation("Parallelism for the cleaning operation. Increase 
this if cleaning becomes slow.");
+      .withDocumentation("This config controls the behavior of both the 
cleaning plan and "
+          + "cleaning execution. Deriving the cleaning plan is parallelized at 
the table "
+          + "partition level, i.e., each table partition is processed by one 
Spark task to figure "
+          + "out the files to clean. The cleaner picks the configured 
parallelism if the number "
+          + "of table partitions is larger than this configured value. The 
parallelism is "
+          + "assigned to the number of table partitions if it is smaller than 
the configured value. "
+          + "The clean execution, i.e., the file deletion, is parallelized at 
file level, which "
+          + "is the unit of Spark task distribution. Similarly, the actual 
parallelism cannot "
+          + "exceed the configured value if the number of files is larger. If 
cleaning plan or "
+          + "execution is slow due to limited parallelism, you can increase 
this to tune the "
+          + "performance..");
 
   public static final ConfigProperty<Boolean> ALLOW_MULTIPLE_CLEANS = 
ConfigProperty
       .key("hoodie.clean.allow.multiple")
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index 9137eb436bb..01b8d191226 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -126,11 +126,11 @@ public class CleanActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I, K,
    */
   List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan 
cleanerPlan) {
     int cleanerParallelism = Math.min(
-        (int) 
(cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()),
+        
cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum(),
         config.getCleanerParallelism());
     LOG.info("Using cleanerParallelism: " + cleanerParallelism);
 
-    context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of 
partitions: " + config.getTableName());
+    context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of 
table: " + config.getTableName());
 
     Stream<Pair<String, CleanFileInfo>> filesToBeDeletedPerPartition =
         cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()

Reply via email to