This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 47687763102 [HUDI-6990] Configurable clustering group read task
parallelism (#9925)
47687763102 is described below
commit 47687763102d4df43609577c08ce6d83ea94d297
Author: ksmou <[email protected]>
AuthorDate: Sat Nov 4 13:36:20 2023 +0800
[HUDI-6990] Configurable clustering group read task parallelism (#9925)
---
.../main/java/org/apache/hudi/config/HoodieClusteringConfig.java | 7 +++++++
.../src/main/java/org/apache/hudi/config/HoodieWriteConfig.java | 4 ++++
.../run/strategy/MultipleSparkJobExecutionStrategy.java | 8 ++++++--
3 files changed, 17 insertions(+), 2 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index e8eea235168..d380c228d8a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -161,6 +161,13 @@ public class HoodieClusteringConfig extends HoodieConfig {
+ "value will let the clustering job run faster, while it will give
additional pressure to the "
+ "execution engines to manage more concurrent running jobs.");
+ public static final ConfigProperty<Integer>
CLUSTERING_GROUP_READ_PARALLELISM = ConfigProperty
+ .key("hoodie.clustering.group.read.parallelism")
+ .defaultValue(20)
+ .markAdvanced()
+ .sinceVersion("1.0.0")
+ .withDocumentation("Maximum number of parallelism when Spark read
records from clustering group.");
+
public static final ConfigProperty<String>
PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST = ConfigProperty
.key(CLUSTERING_STRATEGY_PARAM_PREFIX +
"daybased.skipfromlatest.partitions")
.defaultValue("0")
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index af65420ce44..268438c172e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1795,6 +1795,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return
getInt(HoodieClusteringConfig.LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE);
}
+ public int getClusteringGroupReadParallelism() {
+ return getInt(HoodieClusteringConfig.CLUSTERING_GROUP_READ_PARALLELISM);
+ }
+
/**
* index properties.
*/
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index bf0511138d5..4ae8552f6c0 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -296,7 +296,9 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
String bootstrapBasePath = tableConfig.getBootstrapBasePath().orElse(null);
Option<String[]> partitionFields = tableConfig.getPartitionFields();
- return HoodieJavaRDD.of(jsc.parallelize(clusteringOps,
clusteringOps.size()).mapPartitions(clusteringOpsPartition -> {
+ int readParallelism =
Math.min(writeConfig.getClusteringGroupReadParallelism(), clusteringOps.size());
+
+ return HoodieJavaRDD.of(jsc.parallelize(clusteringOps,
readParallelism).mapPartitions(clusteringOpsPartition -> {
List<Iterator<HoodieRecord<T>>> recordIterators = new ArrayList<>();
clusteringOpsPartition.forEachRemaining(clusteringOp -> {
long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new
SparkTaskContextSupplier(), config);
@@ -352,7 +354,9 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
String bootstrapBasePath = tableConfig.getBootstrapBasePath().orElse(null);
Option<String[]> partitionFields = tableConfig.getPartitionFields();
- return HoodieJavaRDD.of(jsc.parallelize(clusteringOps,
clusteringOps.size())
+ int readParallelism =
Math.min(writeConfig.getClusteringGroupReadParallelism(), clusteringOps.size());
+
+ return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, readParallelism)
.mapPartitions(clusteringOpsPartition -> {
List<Iterator<HoodieRecord<T>>> iteratorsForPartition = new
ArrayList<>();
clusteringOpsPartition.forEachRemaining(clusteringOp -> {