[ 
https://issues.apache.org/jira/browse/HUDI-5327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zouxxyy reassigned HUDI-5327:
-----------------------------

    Assignee: zouxxyy

> ClusteringWithRecordsAsRow generates too many spark stages
> ----------------------------------------------------------
>
>                 Key: HUDI-5327
>                 URL: https://issues.apache.org/jira/browse/HUDI-5327
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: clustering
>            Reporter: zouxxyy
>            Assignee: zouxxyy
>            Priority: Major
>         Attachments: image-2022-12-03-19-24-27-106.png
>
>
> By set hoodie.datasource.write.row.writer.enable=true
> The spark job will generate too many spark stages when clusting
> !image-2022-12-03-19-24-27-106.png|width=1031,height=486!
> In addition to not looking concise, it will bring a hidden performance 
> bottleneck
> {code:java}
> Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
>         clusteringPlan.getInputGroups().stream()
>             .map(inputGroup -> {
>               if 
> (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
>  false)) {
>                 return runClusteringForGroupAsyncAsRow(inputGroup,
>                     clusteringPlan.getStrategy().getStrategyParams(),
>                     shouldPreserveMetadata,
>                     instantTime);
>               }
>               return runClusteringForGroupAsync(inputGroup,
>                   clusteringPlan.getStrategy().getStrategyParams(),
>                   shouldPreserveMetadata,
>                   instantTime);
>             })
>             .collect(Collectors.toList()))
>     .join()
>     .stream(); {code}
> `RunClusteringForGroupAsync`  will submit a spark job, and the default 
> `concurrency of FutureUtils.allOf()` is `the number of CPU cores -1` if no 
> executor is configured, which means that the maximum concurrency of spark 
> tasks = `the number of CPU cores -1` * `the number of files divided by each 
> clusting group`
> For our cluster, the driver has 32 cores, at the maximum concurrency of spark 
> tasks is 31 * 2, although our executor is configured with 20, each with 4 
> cores
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to