[ 
https://issues.apache.org/jira/browse/SPARK-24528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17375215#comment-17375215
 ] 

Cheng Su commented on SPARK-24528:
----------------------------------

Hi [~rahij] - glad to hear that the PR is working fine on your fork. Yes it's 
still on my plan, but was busy with other JIRAs in past months. In this week, 
let me rebase the PR to latest master and collect reviewer's feedback, thanks.

> Add support to read multiple sorted bucket files for data source v1
> -------------------------------------------------------------------
>
>                 Key: SPARK-24528
>                 URL: https://issues.apache.org/jira/browse/SPARK-24528
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.1.0
>            Reporter: Ohad Raviv
>            Priority: Major
>
> https://issues.apache.org/jira/browse/SPARK-24528#Closely related to  
> SPARK-24410, we're trying to optimize a very common use case we have of 
> getting the most updated row by id from a fact table.
> We're saving the table bucketed to skip the shuffle stage, but we're still 
> "waste" time on the Sort operator evethough the data is already sorted.
> here's a good example:
> {code:java}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>     .repartition(col("key"))
>     .write
>   .mode(SaveMode.Overwrite)
>     .bucketBy(3, "key")
>     .sortBy("key", "t1")
>     .saveAsTable("a1"){code}
> {code:java}
> sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain
> == Physical Plan ==
> SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, 
> key#24L, t1, t1#25L, t2, t2#26L))])
> +- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, 
> t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))])
> +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, 
> Format: Parquet, Location: ...{code}
>  
> and here's a bad example, but more realistic:
> {code:java}
> sparkSession.sql("set spark.sql.shuffle.partitions=2")
> sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain
> == Physical Plan ==
> SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, 
> key#32L, t1, t1#33L, t2, t2#34L))])
> +- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, 
> t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))])
> +- *(1) Sort [key#32L ASC NULLS FIRST], false, 0
> +- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, 
> Format: Parquet, Location: ...
> {code}
>  
> I've traced the problem to DataSourceScanExec#235:
> {code:java}
> val sortOrder = if (sortColumns.nonEmpty) {
>   // In case of bucketing, its possible to have multiple files belonging to 
> the
>   // same bucket in a given relation. Each of these files are locally sorted
>   // but those files combined together are not globally sorted. Given that,
>   // the RDD partition will not be sorted even if the relation has sort 
> columns set
>   // Current solution is to check if all the buckets have a single file in it
>   val files = selectedPartitions.flatMap(partition => partition.files)
>   val bucketToFilesGrouping =
>     files.map(_.getPath.getName).groupBy(file => 
> BucketingUtils.getBucketId(file))
>   val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 
> 1){code}
> so obviously the code avoids dealing with this situation now..
> could you think of a way to solve this or bypass it?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to