[ 
https://issues.apache.org/jira/browse/HUDI-5327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zouxxyy updated HUDI-5327:
--------------------------
    Summary: ClusteringWithRecordsAsRow generates too many spark jobs  (was: 
ClusteringWithRecordsAsRow generates too many spark stages)

> ClusteringWithRecordsAsRow generates too many spark jobs
> --------------------------------------------------------
>
>                 Key: HUDI-5327
>                 URL: https://issues.apache.org/jira/browse/HUDI-5327
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: clustering
>            Reporter: zouxxyy
>            Assignee: zouxxyy
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: image-2022-12-03-19-24-27-106.png
>
>
> By set hoodie.datasource.write.row.writer.enable=true
> The spark job will generate too many spark stages when clusting
> !image-2022-12-03-19-24-27-106.png|width=1031,height=486!
> In addition to not looking concise, it will bring a hidden performance 
> bottleneck
> {code:java}
> Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
>         clusteringPlan.getInputGroups().stream()
>             .map(inputGroup -> {
>               if 
> (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
>  false)) {
>                 return runClusteringForGroupAsyncAsRow(inputGroup,
>                     clusteringPlan.getStrategy().getStrategyParams(),
>                     shouldPreserveMetadata,
>                     instantTime);
>               }
>               return runClusteringForGroupAsync(inputGroup,
>                   clusteringPlan.getStrategy().getStrategyParams(),
>                   shouldPreserveMetadata,
>                   instantTime);
>             })
>             .collect(Collectors.toList()))
>     .join()
>     .stream(); {code}
> `runClusteringForGroupAsyncAsRow`  will generate a spark stage for its 
> internal use of collect , and the default concurrency of 
> `FutureUtils.allOf()` is `the number of CPU cores -1` if no executor is 
> configured, which means that the maximum concurrency of spark tasks = `the 
> number of CPU cores -1` * `the number of files divided by each clusting group`
> For our cluster, the driver has 32 CPU cores, the maximum concurrency of 
> spark tasks is 31 * 2, although our executor is configured with 20, each with 
> 4 cores
>  
> In addition, when the set hoodie.datasource.write.row.writer.enable=false, 
> this problem does not occur because it does not generate a spark stage in 
> runClusteringForGroupAsync.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to