[ 
https://issues.apache.org/jira/browse/SPARK-27332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16810249#comment-16810249
 ] 

David Stewart Zink commented on SPARK-27332:
--------------------------------------------

I'm a little confused by that comment chain.

Obviously, current Spark completes all sub-queries before processing the query; 
and therefore the output of each sub-query is reduced to a Concrete Table, 
including sub-types such as the Row, or in this case a Value, i.e. a 
Single-Row-Single-Column.

Obviously, Spark supports filter push-down to DataSources because these can be 
advantageous w.r.t. performance, as by pruning file lists etc.

SPARK-25482 purports to "fix" the problem by not pushing down this single value 
as part of a filter to the DataSource? That's doesn't seem to make any sense, 
unless there's some incapacity to adjust the filter immediately before the 
stage?

> Filter Pushdown duplicates expensive ScalarSubquery (discarding result)
> -----------------------------------------------------------------------
>
>                 Key: SPARK-27332
>                 URL: https://issues.apache.org/jira/browse/SPARK-27332
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 2.4.0
>            Reporter: David Stewart Zink
>            Priority: Major
>
> Test Query:  SELECT a,b FROM r0 WHERE a <= (SELECT AVG(a) from r1) LIMIT 5000
> r0,r1 have schema \{a:Double, b:String}
> IF r0 is a CsvRelation than no problem, but if it is a ParquetRelation then 
> the table scan is performed twice and the AVG(a) is simply discarded in one 
> branch. This appears to have something to do with pushing the filter down 
> into the table scan.
> Breakpoints in org.apache.spark.sql.execution.ScalarSubquery's methods 
> updateResult(), eval(), and doGenCode() seems sufficient to demonstrate that 
> one of the two computed scalars is never used.
> The duplicated ScalarSubquery can be seen in this (physical) plan:
> {noformat}
> == Parsed Logical Plan ==
> 'Project ['a, 'b]
> +- 'Filter ('a <= scalar-subquery#8 [])
>    :  +- 'Project [unresolvedalias('AVG('a), None)]
>    :     +- 'UnresolvedRelation `r1`
>    +- 'UnresolvedRelation `r0`
> == Analyzed Logical Plan ==
> a: double, b: string
> Project [a#9, b#10]
> +- Filter (a#9 <= scalar-subquery#8 [])
>    :  +- Aggregate [avg(a#15) AS avg(a)#22]
>    :     +- SubqueryAlias `r1`
>    :        +- Project [a#15, b#16]
>    :           +- Relation[a#15,b#16] 
> CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null)
>    +- SubqueryAlias `r0`
>       +- Project [a#9, b#10]
>          +- Relation[a#9,b#10] parquet
> == Optimized Logical Plan ==
> Filter (isnotnull(a#9) && (a#9 <= scalar-subquery#8 []))
> :  +- Aggregate [avg(a#15) AS avg(a)#22]
> :     +- Project [a#15]
> :        +- Relation[a#15,b#16] 
> CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null)
> +- Relation[a#9,b#10] parquet
> == Physical Plan ==
> *(1) Project [a#9, b#10]
> +- *(1) Filter (isnotnull(a#9) && (a#9 <= Subquery subquery8))
>    :  +- Subquery subquery8
>    :     +- *(2) HashAggregate(keys=[], functions=[avg(a#15)], 
> output=[avg(a)#22])
>    :        +- Exchange SinglePartition
>    :           +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#15)], 
> output=[sum#27, count#28L])
>    :              +- *(1) Scan 
> CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null) [a#15] PushedFilters: [], 
> ReadSchema: struct<a:double>
>    +- *(1) FileScan parquet [a#9,b#10] Batched: true, Format: Parquet, 
> Location: InMemoryFileIndex[file:/tmp/r0], PartitionFilters: [], 
> PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:double,b:string>
>          +- Subquery subquery8
>             +- *(2) HashAggregate(keys=[], functions=[avg(a#15)], 
> output=[avg(a)#22])
>                +- Exchange SinglePartition
>                   +- *(1) HashAggregate(keys=[], 
> functions=[partial_avg(a#15)], output=[sum#27, count#28L])
>                      +- *(1) Scan 
> CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null) [a#15] PushedFilters: [], 
> ReadSchema: struct<a:double>
> {noformat}
> Whereas when I change r0 to be a CsvRelation:
> {noformat}
> == Parsed Logical Plan ==
> 'Project ['a, 'b]
> +- 'Filter ('a <= scalar-subquery#0 [])
>    :  +- 'Project [unresolvedalias('AVG('a), None)]
>    :     +- 'UnresolvedRelation `r1`
>    +- 'UnresolvedRelation `r0`
> == Analyzed Logical Plan ==
> a: double, b: string
> Project [a#1, b#2]
> +- Filter (a#1 <= scalar-subquery#0 [])
>    :  +- Aggregate [avg(a#7) AS avg(a)#14]
>    :     +- SubqueryAlias `r1`
>    :        +- Project [a#7, b#8]
>    :           +- Relation[a#7,b#8] 
> CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null)
>    +- SubqueryAlias `r0`
>       +- Project [a#1, b#2]
>          +- Relation[a#1,b#2] 
> CsvRelation(<function0>,Some(/tmp/r0),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null)
> == Optimized Logical Plan ==
> Filter (a#1 <= scalar-subquery#0 [])
> :  +- Aggregate [avg(a#7) AS avg(a)#14]
> :     +- Project [a#7]
> :        +- Relation[a#7,b#8] 
> CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null)
> +- Relation[a#1,b#2] 
> CsvRelation(<function0>,Some(/tmp/r0),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null)
> == Physical Plan ==
> *(1) Filter (a#1 <= Subquery subquery0)
> :  +- Subquery subquery0
> :     +- *(2) HashAggregate(keys=[], functions=[avg(a#7)], output=[avg(a)#14])
> :        +- Exchange SinglePartition
> :           +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#7)], 
> output=[sum#19, count#20L])
> :              +- *(1) Scan 
> CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null) [a#7] PushedFilters: [], 
> ReadSchema: struct<a:double>
> +- *(1) Scan 
> CsvRelation(<function0>,Some(/tmp/r0),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null) [a#1,b#2] PushedFilters: 
> [], ReadSchema: struct<a:double,b:string>
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to