[ 
https://issues.apache.org/jira/browse/HUDI-5327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zouxxyy updated HUDI-5327:
--------------------------
    Description: 
By set hoodie.datasource.write.row.writer.enable=true

The spark job will generate too many spark jobs when clusting

!image-2022-12-03-19-24-27-106.png|width=1031,height=486!

In addition to not looking concise, it will bring a hidden performance 
bottleneck
{code:java}
Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
        clusteringPlan.getInputGroups().stream()
            .map(inputGroup -> {
              if 
(getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
 false)) {
                return runClusteringForGroupAsyncAsRow(inputGroup,
                    clusteringPlan.getStrategy().getStrategyParams(),
                    shouldPreserveMetadata,
                    instantTime);
              }
              return runClusteringForGroupAsync(inputGroup,
                  clusteringPlan.getStrategy().getStrategyParams(),
                  shouldPreserveMetadata,
                  instantTime);
            })
            .collect(Collectors.toList()))
    .join()
    .stream(); {code}
`runClusteringForGroupAsyncAsRow`  will generate a spark job for its internal 
use of collect , and the default concurrency of `FutureUtils.allOf()` is `the 
number of CPU cores -1` if no executor is configured, which means that the 
maximum concurrency of spark tasks = `the number of CPU cores -1` * `the number 
of files divided by each clusting group`

For our cluster, the driver has 32 CPU cores, the maximum concurrency of spark 
tasks is 31 * 2, although our executor is configured with 20, each with 4 cores

 

In addition, when the set hoodie.datasource.write.row.writer.enable=false, this 
problem does not occur because it does not generate a spark job in 
runClusteringForGroupAsync.

  was:
By set hoodie.datasource.write.row.writer.enable=true

The spark job will generate too many spark stages when clusting

!image-2022-12-03-19-24-27-106.png|width=1031,height=486!

In addition to not looking concise, it will bring a hidden performance 
bottleneck
{code:java}
Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
        clusteringPlan.getInputGroups().stream()
            .map(inputGroup -> {
              if 
(getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
 false)) {
                return runClusteringForGroupAsyncAsRow(inputGroup,
                    clusteringPlan.getStrategy().getStrategyParams(),
                    shouldPreserveMetadata,
                    instantTime);
              }
              return runClusteringForGroupAsync(inputGroup,
                  clusteringPlan.getStrategy().getStrategyParams(),
                  shouldPreserveMetadata,
                  instantTime);
            })
            .collect(Collectors.toList()))
    .join()
    .stream(); {code}
`runClusteringForGroupAsyncAsRow`  will generate a spark stage for its internal 
use of collect , and the default concurrency of `FutureUtils.allOf()` is `the 
number of CPU cores -1` if no executor is configured, which means that the 
maximum concurrency of spark tasks = `the number of CPU cores -1` * `the number 
of files divided by each clusting group`

For our cluster, the driver has 32 CPU cores, the maximum concurrency of spark 
tasks is 31 * 2, although our executor is configured with 20, each with 4 cores

 

In addition, when the set hoodie.datasource.write.row.writer.enable=false, this 
problem does not occur because it does not generate a spark stage in 
runClusteringForGroupAsync.


> ClusteringWithRecordsAsRow generates too many spark jobs
> --------------------------------------------------------
>
>                 Key: HUDI-5327
>                 URL: https://issues.apache.org/jira/browse/HUDI-5327
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: clustering
>            Reporter: zouxxyy
>            Assignee: zouxxyy
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: image-2022-12-03-19-24-27-106.png
>
>
> By set hoodie.datasource.write.row.writer.enable=true
> The spark job will generate too many spark jobs when clusting
> !image-2022-12-03-19-24-27-106.png|width=1031,height=486!
> In addition to not looking concise, it will bring a hidden performance 
> bottleneck
> {code:java}
> Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
>         clusteringPlan.getInputGroups().stream()
>             .map(inputGroup -> {
>               if 
> (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
>  false)) {
>                 return runClusteringForGroupAsyncAsRow(inputGroup,
>                     clusteringPlan.getStrategy().getStrategyParams(),
>                     shouldPreserveMetadata,
>                     instantTime);
>               }
>               return runClusteringForGroupAsync(inputGroup,
>                   clusteringPlan.getStrategy().getStrategyParams(),
>                   shouldPreserveMetadata,
>                   instantTime);
>             })
>             .collect(Collectors.toList()))
>     .join()
>     .stream(); {code}
> `runClusteringForGroupAsyncAsRow`  will generate a spark job for its internal 
> use of collect , and the default concurrency of `FutureUtils.allOf()` is `the 
> number of CPU cores -1` if no executor is configured, which means that the 
> maximum concurrency of spark tasks = `the number of CPU cores -1` * `the 
> number of files divided by each clusting group`
> For our cluster, the driver has 32 CPU cores, the maximum concurrency of 
> spark tasks is 31 * 2, although our executor is configured with 20, each with 
> 4 cores
>  
> In addition, when the set hoodie.datasource.write.row.writer.enable=false, 
> this problem does not occur because it does not generate a spark job in 
> runClusteringForGroupAsync.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to