[ 
https://issues.apache.org/jira/browse/SPARK-27332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16809720#comment-16809720
 ] 

Liang-Chi Hsieh commented on SPARK-27332:
-----------------------------------------

I think this should be already fixed by SPARK-25482.

> Filter Pushdown duplicates expensive ScalarSubquery (discarding result)
> -----------------------------------------------------------------------
>
>                 Key: SPARK-27332
>                 URL: https://issues.apache.org/jira/browse/SPARK-27332
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 2.4.0
>            Reporter: David Stewart Zink
>            Priority: Major
>
> Test Query:  SELECT a,b FROM r0 WHERE a <= (SELECT AVG(a) from r1) LIMIT 5000
> r0,r1 have schema \{a:Double, b:String}
> IF r0 is a CsvRelation than no problem, but if it is a ParquetRelation then 
> the table scan is performed twice and the AVG(a) is simply discarded in one 
> branch. This appears to have something to do with pushing the filter down 
> into the table scan.
> Breakpoints in org.apache.spark.sql.execution.ScalarSubquery's methods 
> updateResult(), eval(), and doGenCode() seems sufficient to demonstrate that 
> one of the two computed scalars is never used.
> The duplicated ScalarSubquery can be seen in this (physical) plan:
> {noformat}
> == Parsed Logical Plan ==
> 'Project ['a, 'b]
> +- 'Filter ('a <= scalar-subquery#8 [])
>    :  +- 'Project [unresolvedalias('AVG('a), None)]
>    :     +- 'UnresolvedRelation `r1`
>    +- 'UnresolvedRelation `r0`
> == Analyzed Logical Plan ==
> a: double, b: string
> Project [a#9, b#10]
> +- Filter (a#9 <= scalar-subquery#8 [])
>    :  +- Aggregate [avg(a#15) AS avg(a)#22]
>    :     +- SubqueryAlias `r1`
>    :        +- Project [a#15, b#16]
>    :           +- Relation[a#15,b#16] 
> CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null)
>    +- SubqueryAlias `r0`
>       +- Project [a#9, b#10]
>          +- Relation[a#9,b#10] parquet
> == Optimized Logical Plan ==
> Filter (isnotnull(a#9) && (a#9 <= scalar-subquery#8 []))
> :  +- Aggregate [avg(a#15) AS avg(a)#22]
> :     +- Project [a#15]
> :        +- Relation[a#15,b#16] 
> CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null)
> +- Relation[a#9,b#10] parquet
> == Physical Plan ==
> *(1) Project [a#9, b#10]
> +- *(1) Filter (isnotnull(a#9) && (a#9 <= Subquery subquery8))
>    :  +- Subquery subquery8
>    :     +- *(2) HashAggregate(keys=[], functions=[avg(a#15)], 
> output=[avg(a)#22])
>    :        +- Exchange SinglePartition
>    :           +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#15)], 
> output=[sum#27, count#28L])
>    :              +- *(1) Scan 
> CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null) [a#15] PushedFilters: [], 
> ReadSchema: struct<a:double>
>    +- *(1) FileScan parquet [a#9,b#10] Batched: true, Format: Parquet, 
> Location: InMemoryFileIndex[file:/tmp/r0], PartitionFilters: [], 
> PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:double,b:string>
>          +- Subquery subquery8
>             +- *(2) HashAggregate(keys=[], functions=[avg(a#15)], 
> output=[avg(a)#22])
>                +- Exchange SinglePartition
>                   +- *(1) HashAggregate(keys=[], 
> functions=[partial_avg(a#15)], output=[sum#27, count#28L])
>                      +- *(1) Scan 
> CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null) [a#15] PushedFilters: [], 
> ReadSchema: struct<a:double>
> {noformat}
> Whereas when I change r0 to be a CsvRelation:
> {noformat}
> == Parsed Logical Plan ==
> 'Project ['a, 'b]
> +- 'Filter ('a <= scalar-subquery#0 [])
>    :  +- 'Project [unresolvedalias('AVG('a), None)]
>    :     +- 'UnresolvedRelation `r1`
>    +- 'UnresolvedRelation `r0`
> == Analyzed Logical Plan ==
> a: double, b: string
> Project [a#1, b#2]
> +- Filter (a#1 <= scalar-subquery#0 [])
>    :  +- Aggregate [avg(a#7) AS avg(a)#14]
>    :     +- SubqueryAlias `r1`
>    :        +- Project [a#7, b#8]
>    :           +- Relation[a#7,b#8] 
> CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null)
>    +- SubqueryAlias `r0`
>       +- Project [a#1, b#2]
>          +- Relation[a#1,b#2] 
> CsvRelation(<function0>,Some(/tmp/r0),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null)
> == Optimized Logical Plan ==
> Filter (a#1 <= scalar-subquery#0 [])
> :  +- Aggregate [avg(a#7) AS avg(a)#14]
> :     +- Project [a#7]
> :        +- Relation[a#7,b#8] 
> CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null)
> +- Relation[a#1,b#2] 
> CsvRelation(<function0>,Some(/tmp/r0),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null)
> == Physical Plan ==
> *(1) Filter (a#1 <= Subquery subquery0)
> :  +- Subquery subquery0
> :     +- *(2) HashAggregate(keys=[], functions=[avg(a#7)], output=[avg(a)#14])
> :        +- Exchange SinglePartition
> :           +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#7)], 
> output=[sum#19, count#20L])
> :              +- *(1) Scan 
> CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null) [a#7] PushedFilters: [], 
> ReadSchema: struct<a:double>
> +- *(1) Scan 
> CsvRelation(<function0>,Some(/tmp/r0),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null) [a#1,b#2] PushedFilters: 
> [], ReadSchema: struct<a:double,b:string>
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to