[
https://issues.apache.org/jira/browse/HUDI-5327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
zouxxyy updated HUDI-5327:
--------------------------
Description:
By set hoodie.datasource.write.row.writer.enable=true
This application will generate too many spark jobs when execute clusting
!image-2022-12-03-19-24-27-106.png|width=1031,height=486!
In addition to not looking concise, it will bring a hidden performance
bottleneck
{code:java}
Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
clusteringPlan.getInputGroups().stream()
.map(inputGroup -> {
if
(getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
false)) {
return runClusteringForGroupAsyncAsRow(inputGroup,
clusteringPlan.getStrategy().getStrategyParams(),
shouldPreserveMetadata,
instantTime);
}
return runClusteringForGroupAsync(inputGroup,
clusteringPlan.getStrategy().getStrategyParams(),
shouldPreserveMetadata,
instantTime);
})
.collect(Collectors.toList()))
.join()
.stream(); {code}
`runClusteringForGroupAsyncAsRow` will generate a spark job for its internal
use of collect , and the default concurrency of `FutureUtils.allOf()` is `the
number of CPU cores -1` if no executor is configured, which means that the
maximum concurrency of spark tasks = `the number of CPU cores -1` * `the number
of files divided by each clusting group`
For our cluster, the driver has 32 CPU cores, the maximum concurrency of spark
tasks is 31 * 2, although our executor is configured with 20, each with 4 cores
In addition, when the set hoodie.datasource.write.row.writer.enable=false, this
problem does not occur because it does not generate a spark job in
runClusteringForGroupAsync.
was:
By set hoodie.datasource.write.row.writer.enable=true
The spark job will generate too many spark jobs when clusting
!image-2022-12-03-19-24-27-106.png|width=1031,height=486!
In addition to not looking concise, it will bring a hidden performance
bottleneck
{code:java}
Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
clusteringPlan.getInputGroups().stream()
.map(inputGroup -> {
if
(getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
false)) {
return runClusteringForGroupAsyncAsRow(inputGroup,
clusteringPlan.getStrategy().getStrategyParams(),
shouldPreserveMetadata,
instantTime);
}
return runClusteringForGroupAsync(inputGroup,
clusteringPlan.getStrategy().getStrategyParams(),
shouldPreserveMetadata,
instantTime);
})
.collect(Collectors.toList()))
.join()
.stream(); {code}
`runClusteringForGroupAsyncAsRow` will generate a spark job for its internal
use of collect , and the default concurrency of `FutureUtils.allOf()` is `the
number of CPU cores -1` if no executor is configured, which means that the
maximum concurrency of spark tasks = `the number of CPU cores -1` * `the number
of files divided by each clusting group`
For our cluster, the driver has 32 CPU cores, the maximum concurrency of spark
tasks is 31 * 2, although our executor is configured with 20, each with 4 cores
In addition, when the set hoodie.datasource.write.row.writer.enable=false, this
problem does not occur because it does not generate a spark job in
runClusteringForGroupAsync.
> ClusteringWithRecordsAsRow generates too many spark jobs
> --------------------------------------------------------
>
> Key: HUDI-5327
> URL: https://issues.apache.org/jira/browse/HUDI-5327
> Project: Apache Hudi
> Issue Type: Improvement
> Components: clustering
> Reporter: zouxxyy
> Assignee: zouxxyy
> Priority: Major
> Labels: pull-request-available
> Attachments: image-2022-12-03-19-24-27-106.png
>
>
> By set hoodie.datasource.write.row.writer.enable=true
> This application will generate too many spark jobs when execute clusting
> !image-2022-12-03-19-24-27-106.png|width=1031,height=486!
> In addition to not looking concise, it will bring a hidden performance
> bottleneck
> {code:java}
> Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
> clusteringPlan.getInputGroups().stream()
> .map(inputGroup -> {
> if
> (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
> false)) {
> return runClusteringForGroupAsyncAsRow(inputGroup,
> clusteringPlan.getStrategy().getStrategyParams(),
> shouldPreserveMetadata,
> instantTime);
> }
> return runClusteringForGroupAsync(inputGroup,
> clusteringPlan.getStrategy().getStrategyParams(),
> shouldPreserveMetadata,
> instantTime);
> })
> .collect(Collectors.toList()))
> .join()
> .stream(); {code}
> `runClusteringForGroupAsyncAsRow` will generate a spark job for its internal
> use of collect , and the default concurrency of `FutureUtils.allOf()` is `the
> number of CPU cores -1` if no executor is configured, which means that the
> maximum concurrency of spark tasks = `the number of CPU cores -1` * `the
> number of files divided by each clusting group`
> For our cluster, the driver has 32 CPU cores, the maximum concurrency of
> spark tasks is 31 * 2, although our executor is configured with 20, each with
> 4 cores
>
> In addition, when the set hoodie.datasource.write.row.writer.enable=false,
> this problem does not occur because it does not generate a spark job in
> runClusteringForGroupAsync.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)