[
https://issues.apache.org/jira/browse/HUDI-5327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
zouxxyy reassigned HUDI-5327:
-----------------------------
Assignee: zouxxyy
> ClusteringWithRecordsAsRow generates too many spark stages
> ----------------------------------------------------------
>
> Key: HUDI-5327
> URL: https://issues.apache.org/jira/browse/HUDI-5327
> Project: Apache Hudi
> Issue Type: Improvement
> Components: clustering
> Reporter: zouxxyy
> Assignee: zouxxyy
> Priority: Major
> Attachments: image-2022-12-03-19-24-27-106.png
>
>
> By set hoodie.datasource.write.row.writer.enable=true
> The spark job will generate too many spark stages when clusting
> !image-2022-12-03-19-24-27-106.png|width=1031,height=486!
> In addition to not looking concise, it will bring a hidden performance
> bottleneck
> {code:java}
> Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
> clusteringPlan.getInputGroups().stream()
> .map(inputGroup -> {
> if
> (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
> false)) {
> return runClusteringForGroupAsyncAsRow(inputGroup,
> clusteringPlan.getStrategy().getStrategyParams(),
> shouldPreserveMetadata,
> instantTime);
> }
> return runClusteringForGroupAsync(inputGroup,
> clusteringPlan.getStrategy().getStrategyParams(),
> shouldPreserveMetadata,
> instantTime);
> })
> .collect(Collectors.toList()))
> .join()
> .stream(); {code}
> `RunClusteringForGroupAsync` will submit a spark job, and the default
> `concurrency of FutureUtils.allOf()` is `the number of CPU cores -1` if no
> executor is configured, which means that the maximum concurrency of spark
> tasks = `the number of CPU cores -1` * `the number of files divided by each
> clusting group`
> For our cluster, the driver has 32 cores, at the maximum concurrency of spark
> tasks is 31 * 2, although our executor is configured with 20, each with 4
> cores
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)