[
https://issues.apache.org/jira/browse/SPARK-27332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806242#comment-16806242
]
David Stewart Zink commented on SPARK-27332:
--------------------------------------------
Oh yeah, since each ScalarSubquery takes 45 minutes in my irl case (only 128
cores), it's very easy to see that the work *is* being duplicated.
> Filter Pushdown duplicates expensive ScalarSubquery (discarding result)
> -----------------------------------------------------------------------
>
> Key: SPARK-27332
> URL: https://issues.apache.org/jira/browse/SPARK-27332
> Project: Spark
> Issue Type: Bug
> Components: Optimizer
> Affects Versions: 2.4.0
> Reporter: David Stewart Zink
> Priority: Major
>
> Test Query: SELECT a,b FROM r0 WHERE a <= (SELECT AVG(a) from r1) LIMIT 5000
> r0,r1 have schema \{a:Double, b:String}
> IF r0 is a CsvRelation than no problem, but if it is a ParquetRelation then
> the table scan is performed twice and the AVG(a) is simply discarded in one
> branch. This appears to have something to do with pushing the filter down
> into the table scan.
> Breakpoints in ScalarSubquery's methods updateResult(), eval(), and
> doGenCode() seems sufficient to demonstrate that one of the two computed
> scalars is never used.
> The duplicated ScalarSubquery can be seen in this (physical) plan:
> {noformat}
> == Parsed Logical Plan ==
> 'Project ['a, 'b]
> +- 'Filter ('a <= scalar-subquery#8 [])
> : +- 'Project [unresolvedalias('AVG('a), None)]
> : +- 'UnresolvedRelation `r1`
> +- 'UnresolvedRelation `r0`
> == Analyzed Logical Plan ==
> a: double, b: string
> Project [a#9, b#10]
> +- Filter (a#9 <= scalar-subquery#8 [])
> : +- Aggregate [avg(a#15) AS avg(a)#22]
> : +- SubqueryAlias `r1`
> : +- Project [a#15, b#16]
> : +- Relation[a#15,b#16]
> CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
> StructField(b,StringType,true)),false,null,,null)
> +- SubqueryAlias `r0`
> +- Project [a#9, b#10]
> +- Relation[a#9,b#10] parquet
> == Optimized Logical Plan ==
> Filter (isnotnull(a#9) && (a#9 <= scalar-subquery#8 []))
> : +- Aggregate [avg(a#15) AS avg(a)#22]
> : +- Project [a#15]
> : +- Relation[a#15,b#16]
> CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
> StructField(b,StringType,true)),false,null,,null)
> +- Relation[a#9,b#10] parquet
> == Physical Plan ==
> *(1) Project [a#9, b#10]
> +- *(1) Filter (isnotnull(a#9) && (a#9 <= Subquery subquery8))
> : +- Subquery subquery8
> : +- *(2) HashAggregate(keys=[], functions=[avg(a#15)],
> output=[avg(a)#22])
> : +- Exchange SinglePartition
> : +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#15)],
> output=[sum#27, count#28L])
> : +- *(1) Scan
> CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
> StructField(b,StringType,true)),false,null,,null) [a#15] PushedFilters: [],
> ReadSchema: struct<a:double>
> +- *(1) FileScan parquet [a#9,b#10] Batched: true, Format: Parquet,
> Location: InMemoryFileIndex[file:/tmp/r0], PartitionFilters: [],
> PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:double,b:string>
> +- Subquery subquery8
> +- *(2) HashAggregate(keys=[], functions=[avg(a#15)],
> output=[avg(a)#22])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[],
> functions=[partial_avg(a#15)], output=[sum#27, count#28L])
> +- *(1) Scan
> CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
> StructField(b,StringType,true)),false,null,,null) [a#15] PushedFilters: [],
> ReadSchema: struct<a:double>
> {noformat}
> Whereas when I change r0 to be a CsvRelation:
> {noformat}
> == Parsed Logical Plan ==
> 'Project ['a, 'b]
> +- 'Filter ('a <= scalar-subquery#0 [])
> : +- 'Project [unresolvedalias('AVG('a), None)]
> : +- 'UnresolvedRelation `r1`
> +- 'UnresolvedRelation `r0`
> == Analyzed Logical Plan ==
> a: double, b: string
> Project [a#1, b#2]
> +- Filter (a#1 <= scalar-subquery#0 [])
> : +- Aggregate [avg(a#7) AS avg(a)#14]
> : +- SubqueryAlias `r1`
> : +- Project [a#7, b#8]
> : +- Relation[a#7,b#8]
> CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
> StructField(b,StringType,true)),false,null,,null)
> +- SubqueryAlias `r0`
> +- Project [a#1, b#2]
> +- Relation[a#1,b#2]
> CsvRelation(<function0>,Some(/tmp/r0),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
> StructField(b,StringType,true)),false,null,,null)
> == Optimized Logical Plan ==
> Filter (a#1 <= scalar-subquery#0 [])
> : +- Aggregate [avg(a#7) AS avg(a)#14]
> : +- Project [a#7]
> : +- Relation[a#7,b#8]
> CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
> StructField(b,StringType,true)),false,null,,null)
> +- Relation[a#1,b#2]
> CsvRelation(<function0>,Some(/tmp/r0),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
> StructField(b,StringType,true)),false,null,,null)
> == Physical Plan ==
> *(1) Filter (a#1 <= Subquery subquery0)
> : +- Subquery subquery0
> : +- *(2) HashAggregate(keys=[], functions=[avg(a#7)], output=[avg(a)#14])
> : +- Exchange SinglePartition
> : +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#7)],
> output=[sum#19, count#20L])
> : +- *(1) Scan
> CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
> StructField(b,StringType,true)),false,null,,null) [a#7] PushedFilters: [],
> ReadSchema: struct<a:double>
> +- *(1) Scan
> CsvRelation(<function0>,Some(/tmp/r0),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
> StructField(b,StringType,true)),false,null,,null) [a#1,b#2] PushedFilters:
> [], ReadSchema: struct<a:double,b:string>
> {noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]