[
https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554340#comment-16554340
]
Takeshi Yamamuro commented on SPARK-24906:
------------------------------------------
Ah, I see. It make some sense to me. In DataSourceScanExec, in case that
`requiredSchema` has the relatively smaller number of columns than
`dataSchema`, we might consider an additional term to make `maxSplitBytes`
larger in `createNonBucketedReadRDD`.
https://github.com/apache/spark/blob/3d5c61e5fd24f07302e39b5d61294da79aa0c2f9/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L316
WDYT? [~smilegator] [~viirya]
> Enlarge split size for columnar file to ensure the task read enough data
> ------------------------------------------------------------------------
>
> Key: SPARK-24906
> URL: https://issues.apache.org/jira/browse/SPARK-24906
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 2.3.1
> Reporter: Jason Guo
> Priority: Critical
> Attachments: image-2018-07-24-20-26-32-441.png,
> image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png,
> image-2018-07-24-20-30-24-552.png
>
>
> For columnar file, such as, when spark sql read the table, each split will be
> 128 MB by default since spark.sql.files.maxPartitionBytes is default to
> 128MB. Even when user set it to a large value, such as 512MB, the task may
> read only few MB or even hundreds of KB. Because the table (Parquet) may
> consists of dozens of columns while the SQL only need few columns. And spark
> will prune the unnecessary columns.
>
> In this case, spark DataSourceScanExec can enlarge maxPartitionBytes
> adaptively.
> For example, there is 40 columns , 20 are integer while another 20 are long.
> When use query on an integer type column and an long type column, the
> maxPartitionBytes should be 20 times larger. (20*4+20*8) / (4+8) = 20.
>
> With this optimization, the number of task will be smaller and the job will
> run faster. More importantly, for a very large cluster (more the 10 thousand
> nodes), it will relieve RM's schedule pressure.
>
> Here is the test
>
> The table named test2 has more than 40 columns and there are more than 5 TB
> data each hour.
> When we issue a very simple query
>
> {code:java}
> select count(device_id) from test2 where date=20180708 and hour='23'{code}
>
> There are 72176 tasks and the duration of the job is 4.8 minutes
> !image-2018-07-24-20-26-32-441.png!
>
> Most tasks last less than 1 second and read less than 1.5 MB data
> !image-2018-07-24-20-28-06-269.png!
>
> After the optimization, there are only 1615 tasks and the job last only 30
> seconds. It almost 10 times faster.
> !image-2018-07-24-20-29-24-797.png!
>
> The median of read data is 44.2MB.
> !image-2018-07-24-20-30-24-552.png!
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]