[ https://issues.apache.org/jira/browse/SPARK-27332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16809720#comment-16809720 ]
Liang-Chi Hsieh commented on SPARK-27332: ----------------------------------------- I think this should be already fixed by SPARK-25482. > Filter Pushdown duplicates expensive ScalarSubquery (discarding result) > ----------------------------------------------------------------------- > > Key: SPARK-27332 > URL: https://issues.apache.org/jira/browse/SPARK-27332 > Project: Spark > Issue Type: Bug > Components: Optimizer > Affects Versions: 2.4.0 > Reporter: David Stewart Zink > Priority: Major > > Test Query: SELECT a,b FROM r0 WHERE a <= (SELECT AVG(a) from r1) LIMIT 5000 > r0,r1 have schema \{a:Double, b:String} > IF r0 is a CsvRelation than no problem, but if it is a ParquetRelation then > the table scan is performed twice and the AVG(a) is simply discarded in one > branch. This appears to have something to do with pushing the filter down > into the table scan. > Breakpoints in org.apache.spark.sql.execution.ScalarSubquery's methods > updateResult(), eval(), and doGenCode() seems sufficient to demonstrate that > one of the two computed scalars is never used. > The duplicated ScalarSubquery can be seen in this (physical) plan: > {noformat} > == Parsed Logical Plan == > 'Project ['a, 'b] > +- 'Filter ('a <= scalar-subquery#8 []) > : +- 'Project [unresolvedalias('AVG('a), None)] > : +- 'UnresolvedRelation `r1` > +- 'UnresolvedRelation `r0` > == Analyzed Logical Plan == > a: double, b: string > Project [a#9, b#10] > +- Filter (a#9 <= scalar-subquery#8 []) > : +- Aggregate [avg(a#15) AS avg(a)#22] > : +- SubqueryAlias `r1` > : +- Project [a#15, b#16] > : +- Relation[a#15,b#16] > CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false), > StructField(b,StringType,true)),false,null,,null) > +- SubqueryAlias `r0` > +- Project [a#9, b#10] > +- Relation[a#9,b#10] parquet > == Optimized Logical Plan == > Filter (isnotnull(a#9) && (a#9 <= scalar-subquery#8 [])) > : +- Aggregate [avg(a#15) AS avg(a)#22] > : +- Project [a#15] > : +- Relation[a#15,b#16] > CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false), > StructField(b,StringType,true)),false,null,,null) > +- Relation[a#9,b#10] parquet > == Physical Plan == > *(1) Project [a#9, b#10] > +- *(1) Filter (isnotnull(a#9) && (a#9 <= Subquery subquery8)) > : +- Subquery subquery8 > : +- *(2) HashAggregate(keys=[], functions=[avg(a#15)], > output=[avg(a)#22]) > : +- Exchange SinglePartition > : +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#15)], > output=[sum#27, count#28L]) > : +- *(1) Scan > CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false), > StructField(b,StringType,true)),false,null,,null) [a#15] PushedFilters: [], > ReadSchema: struct<a:double> > +- *(1) FileScan parquet [a#9,b#10] Batched: true, Format: Parquet, > Location: InMemoryFileIndex[file:/tmp/r0], PartitionFilters: [], > PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:double,b:string> > +- Subquery subquery8 > +- *(2) HashAggregate(keys=[], functions=[avg(a#15)], > output=[avg(a)#22]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], > functions=[partial_avg(a#15)], output=[sum#27, count#28L]) > +- *(1) Scan > CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false), > StructField(b,StringType,true)),false,null,,null) [a#15] PushedFilters: [], > ReadSchema: struct<a:double> > {noformat} > Whereas when I change r0 to be a CsvRelation: > {noformat} > == Parsed Logical Plan == > 'Project ['a, 'b] > +- 'Filter ('a <= scalar-subquery#0 []) > : +- 'Project [unresolvedalias('AVG('a), None)] > : +- 'UnresolvedRelation `r1` > +- 'UnresolvedRelation `r0` > == Analyzed Logical Plan == > a: double, b: string > Project [a#1, b#2] > +- Filter (a#1 <= scalar-subquery#0 []) > : +- Aggregate [avg(a#7) AS avg(a)#14] > : +- SubqueryAlias `r1` > : +- Project [a#7, b#8] > : +- Relation[a#7,b#8] > CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false), > StructField(b,StringType,true)),false,null,,null) > +- SubqueryAlias `r0` > +- Project [a#1, b#2] > +- Relation[a#1,b#2] > CsvRelation(<function0>,Some(/tmp/r0),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false), > StructField(b,StringType,true)),false,null,,null) > == Optimized Logical Plan == > Filter (a#1 <= scalar-subquery#0 []) > : +- Aggregate [avg(a#7) AS avg(a)#14] > : +- Project [a#7] > : +- Relation[a#7,b#8] > CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false), > StructField(b,StringType,true)),false,null,,null) > +- Relation[a#1,b#2] > CsvRelation(<function0>,Some(/tmp/r0),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false), > StructField(b,StringType,true)),false,null,,null) > == Physical Plan == > *(1) Filter (a#1 <= Subquery subquery0) > : +- Subquery subquery0 > : +- *(2) HashAggregate(keys=[], functions=[avg(a#7)], output=[avg(a)#14]) > : +- Exchange SinglePartition > : +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#7)], > output=[sum#19, count#20L]) > : +- *(1) Scan > CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false), > StructField(b,StringType,true)),false,null,,null) [a#7] PushedFilters: [], > ReadSchema: struct<a:double> > +- *(1) Scan > CsvRelation(<function0>,Some(/tmp/r0),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false), > StructField(b,StringType,true)),false,null,,null) [a#1,b#2] PushedFilters: > [], ReadSchema: struct<a:double,b:string> > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org