This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b36e7c45990 [HUDI-6293] Make HoodieClusteringJob's parallelism of
clustering_task more reasonable (#8866)
b36e7c45990 is described below
commit b36e7c459904860b0be086c144ba0b175961e805
Author: voonhous <[email protected]>
AuthorDate: Fri Jun 2 10:52:04 2023 +0800
[HUDI-6293] Make HoodieClusteringJob's parallelism of clustering_task more
reasonable (#8866)
---
.../org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
index 633f06b0e4f..223f85defca 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
@@ -310,9 +310,12 @@ public class HoodieFlinkClusteringJob {
HoodieInstant instant =
HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstant.getTimestamp());
+ int inputGroupSize = clusteringPlan.getInputGroups().size();
+
// get clusteringParallelism.
int clusteringParallelism =
conf.getInteger(FlinkOptions.CLUSTERING_TASKS) == -1
- ? clusteringPlan.getInputGroups().size() :
conf.getInteger(FlinkOptions.CLUSTERING_TASKS);
+ ? inputGroupSize
+ : Math.min(conf.getInteger(FlinkOptions.CLUSTERING_TASKS),
inputGroupSize);
// Mark instant as clustering inflight
table.getActiveTimeline().transitionReplaceRequestedToInflight(instant,
Option.empty());