This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new bd4b13926ed [HUDI-5926] Improve cleaner parallelism (#8171)
bd4b13926ed is described below
commit bd4b13926ed4a0cdccc347167ac24556200d064a
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue Mar 14 11:56:25 2023 -0700
[HUDI-5926] Improve cleaner parallelism (#8171)
---
.../main/java/org/apache/hudi/config/HoodieCleanConfig.java | 13 ++++++++++++-
.../apache/hudi/table/action/clean/CleanActionExecutor.java | 4 ++--
2 files changed, 14 insertions(+), 3 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java
index 30289e1acba..c1b66a371d1 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.table.action.clean.CleaningTriggerStrategy;
import javax.annotation.concurrent.Immutable;
+
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
@@ -120,7 +121,17 @@ public class HoodieCleanConfig extends HoodieConfig {
public static final ConfigProperty<String> CLEANER_PARALLELISM_VALUE =
ConfigProperty
.key("hoodie.cleaner.parallelism")
.defaultValue("200")
- .withDocumentation("Parallelism for the cleaning operation. Increase
this if cleaning becomes slow.");
+ .withDocumentation("This config controls the behavior of both the
cleaning plan and "
+ + "cleaning execution. Deriving the cleaning plan is parallelized at
the table "
+ + "partition level, i.e., each table partition is processed by one
Spark task to figure "
+ + "out the files to clean. The cleaner picks the configured
parallelism if the number "
+ + "of table partitions is larger than this configured value. The
parallelism is "
+ + "assigned to the number of table partitions if it is smaller than
the configured value. "
+ + "The clean execution, i.e., the file deletion, is parallelized at
file level, which "
+ + "is the unit of Spark task distribution. Similarly, the actual
parallelism cannot "
+ + "exceed the configured value if the number of files is larger. If
cleaning plan or "
+ + "execution is slow due to limited parallelism, you can increase
this to tune the "
+ + "performance..");
public static final ConfigProperty<Boolean> ALLOW_MULTIPLE_CLEANS =
ConfigProperty
.key("hoodie.clean.allow.multiple")
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index 9137eb436bb..01b8d191226 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -126,11 +126,11 @@ public class CleanActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I, K,
*/
List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan
cleanerPlan) {
int cleanerParallelism = Math.min(
- (int)
(cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()),
+
cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum(),
config.getCleanerParallelism());
LOG.info("Using cleanerParallelism: " + cleanerParallelism);
- context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of
partitions: " + config.getTableName());
+ context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of
table: " + config.getTableName());
Stream<Pair<String, CleanFileInfo>> filesToBeDeletedPerPartition =
cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()