This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 47687763102 [HUDI-6990] Configurable clustering group read task 
parallelism  (#9925)
47687763102 is described below

commit 47687763102d4df43609577c08ce6d83ea94d297
Author: ksmou <[email protected]>
AuthorDate: Sat Nov 4 13:36:20 2023 +0800

    [HUDI-6990] Configurable clustering group read task parallelism  (#9925)
---
 .../main/java/org/apache/hudi/config/HoodieClusteringConfig.java  | 7 +++++++
 .../src/main/java/org/apache/hudi/config/HoodieWriteConfig.java   | 4 ++++
 .../run/strategy/MultipleSparkJobExecutionStrategy.java           | 8 ++++++--
 3 files changed, 17 insertions(+), 2 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index e8eea235168..d380c228d8a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -161,6 +161,13 @@ public class HoodieClusteringConfig extends HoodieConfig {
           + "value will let the clustering job run faster, while it will give 
additional pressure to the "
           + "execution engines to manage more concurrent running jobs.");
 
+  public static final ConfigProperty<Integer> 
CLUSTERING_GROUP_READ_PARALLELISM = ConfigProperty
+      .key("hoodie.clustering.group.read.parallelism")
+      .defaultValue(20)
+      .markAdvanced()
+      .sinceVersion("1.0.0")
+      .withDocumentation("Maximum number of parallelism when Spark read 
records from clustering group.");
+
   public static final ConfigProperty<String> 
PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST = ConfigProperty
       .key(CLUSTERING_STRATEGY_PARAM_PREFIX + 
"daybased.skipfromlatest.partitions")
       .defaultValue("0")
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index af65420ce44..268438c172e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1795,6 +1795,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return 
getInt(HoodieClusteringConfig.LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE);
   }
 
+  public int getClusteringGroupReadParallelism() {
+    return getInt(HoodieClusteringConfig.CLUSTERING_GROUP_READ_PARALLELISM);
+  }
+
   /**
    * index properties.
    */
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index bf0511138d5..4ae8552f6c0 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -296,7 +296,9 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
     String bootstrapBasePath = tableConfig.getBootstrapBasePath().orElse(null);
     Option<String[]> partitionFields = tableConfig.getPartitionFields();
 
-    return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, 
clusteringOps.size()).mapPartitions(clusteringOpsPartition -> {
+    int readParallelism = 
Math.min(writeConfig.getClusteringGroupReadParallelism(), clusteringOps.size());
+
+    return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, 
readParallelism).mapPartitions(clusteringOpsPartition -> {
       List<Iterator<HoodieRecord<T>>> recordIterators = new ArrayList<>();
       clusteringOpsPartition.forEachRemaining(clusteringOp -> {
         long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new 
SparkTaskContextSupplier(), config);
@@ -352,7 +354,9 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
     String bootstrapBasePath = tableConfig.getBootstrapBasePath().orElse(null);
     Option<String[]> partitionFields = tableConfig.getPartitionFields();
 
-    return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, 
clusteringOps.size())
+    int readParallelism = 
Math.min(writeConfig.getClusteringGroupReadParallelism(), clusteringOps.size());
+
+    return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, readParallelism)
         .mapPartitions(clusteringOpsPartition -> {
           List<Iterator<HoodieRecord<T>>> iteratorsForPartition = new 
ArrayList<>();
           clusteringOpsPartition.forEachRemaining(clusteringOp -> {

Reply via email to