[ 
https://issues.apache.org/jira/browse/FLINK-19641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Caizhi Weng updated FLINK-19641:
--------------------------------
    Description: 
The current implementation of {{HiveTableSource#createBatchSource}} for 
calculating parallelism directly uses 
{{inputFormat.createInputSplits(0).length}} as the number of splits. However 
{{createInputSplits}} may be costly as it will read some data from all source 
files, especially when the table is not partitioned and the number of files are 
large.

Many Hive tables maintain the number of files in that table, and it's obvious 
that the number of splits is at least the number of files. So we can try to 
fetch the number of files (almost without cost) first and if the number of 
files already exceeds maximum parallelism we can directly use the maximum 
parallelism without calling {{createInputSplits}}.

This is a significant optimization on the current Flink TPCDS benchmark, which 
will create some table with 15000 files without partitioning. This optimization 
will improve the performance of the whole benchmark by 300s and more.

  was:
The current implementation of {{HiveTableSource#calculateParallelism}} directly 
uses {{inputFormat.createInputSplits(0).length}} as the number of splits. 
However {{createInputSplits}} may be costly as it will read some data from all 
source files, especially when the table is not partitioned and the number of 
files are large.

Many Hive tables maintain the number of files in that table, and it's obvious 
that the number of splits is at least the number of files. So we can try to 
fetch the number of files (almost without cost) first and if the number of 
files already exceeds maximum parallelism we can directly use the maximum 
parallelism without calling {{createInputSplits}}.

This is a significant optimization on the current flink TPCDS benchmark, which 
will create some table with 15000 files without partitioning. This optimization 
will improve the performance of the whole benchmark by 300s and more.


> Optimize parallelism calculating of HiveTableSource by checking file number
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-19641
>                 URL: https://issues.apache.org/jira/browse/FLINK-19641
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Hive
>            Reporter: Caizhi Weng
>            Assignee: Caizhi Weng
>            Priority: Major
>             Fix For: 1.12.0
>
>
> The current implementation of {{HiveTableSource#createBatchSource}} for 
> calculating parallelism directly uses 
> {{inputFormat.createInputSplits(0).length}} as the number of splits. However 
> {{createInputSplits}} may be costly as it will read some data from all source 
> files, especially when the table is not partitioned and the number of files 
> are large.
> Many Hive tables maintain the number of files in that table, and it's obvious 
> that the number of splits is at least the number of files. So we can try to 
> fetch the number of files (almost without cost) first and if the number of 
> files already exceeds maximum parallelism we can directly use the maximum 
> parallelism without calling {{createInputSplits}}.
> This is a significant optimization on the current Flink TPCDS benchmark, 
> which will create some table with 15000 files without partitioning. This 
> optimization will improve the performance of the whole benchmark by 300s and 
> more.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to