[ 
https://issues.apache.org/jira/browse/SPARK-27332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Stewart Zink updated SPARK-27332:
---------------------------------------
    Description: 
Test Query:  SELECT a,b FROM r0 WHERE a <= (SELECT AVG(a) from r1) LIMIT 5000

r0,r1 have schema \{a:Double, b:String}

IF r0 is a CsvRelation than no problem, but if it is a ParquetRelation then the 
table scan is performed twice and the AVG(a) is simply discarded in one branch. 
This appears to have something to do with pushing the filter down into the 
table scan.

Breakpoints in org.apache.spark.sql.execution.ScalarSubquery's methods 
updateResult(), eval(), and doGenCode() seems sufficient to demonstrate that 
one of the two computed scalars is never used.

The duplicated ScalarSubquery can be seen in this (physical) plan:
{noformat}
== Parsed Logical Plan ==
'Project ['a, 'b]
+- 'Filter ('a <= scalar-subquery#8 [])
   :  +- 'Project [unresolvedalias('AVG('a), None)]
   :     +- 'UnresolvedRelation `r1`
   +- 'UnresolvedRelation `r0`

== Analyzed Logical Plan ==
a: double, b: string
Project [a#9, b#10]
+- Filter (a#9 <= scalar-subquery#8 [])
   :  +- Aggregate [avg(a#15) AS avg(a)#22]
   :     +- SubqueryAlias `r1`
   :        +- Project [a#15, b#16]
   :           +- Relation[a#15,b#16] 
CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
 StructField(b,StringType,true)),false,null,,null)
   +- SubqueryAlias `r0`
      +- Project [a#9, b#10]
         +- Relation[a#9,b#10] parquet

== Optimized Logical Plan ==
Filter (isnotnull(a#9) && (a#9 <= scalar-subquery#8 []))
:  +- Aggregate [avg(a#15) AS avg(a)#22]
:     +- Project [a#15]
:        +- Relation[a#15,b#16] 
CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
 StructField(b,StringType,true)),false,null,,null)
+- Relation[a#9,b#10] parquet

== Physical Plan ==
*(1) Project [a#9, b#10]
+- *(1) Filter (isnotnull(a#9) && (a#9 <= Subquery subquery8))
   :  +- Subquery subquery8
   :     +- *(2) HashAggregate(keys=[], functions=[avg(a#15)], 
output=[avg(a)#22])
   :        +- Exchange SinglePartition
   :           +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#15)], 
output=[sum#27, count#28L])
   :              +- *(1) Scan 
CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
 StructField(b,StringType,true)),false,null,,null) [a#15] PushedFilters: [], 
ReadSchema: struct<a:double>
   +- *(1) FileScan parquet [a#9,b#10] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/tmp/r0], PartitionFilters: [], PushedFilters: 
[IsNotNull(a)], ReadSchema: struct<a:double,b:string>
         +- Subquery subquery8
            +- *(2) HashAggregate(keys=[], functions=[avg(a#15)], 
output=[avg(a)#22])
               +- Exchange SinglePartition
                  +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#15)], 
output=[sum#27, count#28L])
                     +- *(1) Scan 
CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
 StructField(b,StringType,true)),false,null,,null) [a#15] PushedFilters: [], 
ReadSchema: struct<a:double>
{noformat}
Whereas when I change r0 to be a CsvRelation:
{noformat}
== Parsed Logical Plan ==
'Project ['a, 'b]
+- 'Filter ('a <= scalar-subquery#0 [])
   :  +- 'Project [unresolvedalias('AVG('a), None)]
   :     +- 'UnresolvedRelation `r1`
   +- 'UnresolvedRelation `r0`

== Analyzed Logical Plan ==
a: double, b: string
Project [a#1, b#2]
+- Filter (a#1 <= scalar-subquery#0 [])
   :  +- Aggregate [avg(a#7) AS avg(a)#14]
   :     +- SubqueryAlias `r1`
   :        +- Project [a#7, b#8]
   :           +- Relation[a#7,b#8] 
CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
 StructField(b,StringType,true)),false,null,,null)
   +- SubqueryAlias `r0`
      +- Project [a#1, b#2]
         +- Relation[a#1,b#2] 
CsvRelation(<function0>,Some(/tmp/r0),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
 StructField(b,StringType,true)),false,null,,null)

== Optimized Logical Plan ==
Filter (a#1 <= scalar-subquery#0 [])
:  +- Aggregate [avg(a#7) AS avg(a)#14]
:     +- Project [a#7]
:        +- Relation[a#7,b#8] 
CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
 StructField(b,StringType,true)),false,null,,null)
+- Relation[a#1,b#2] 
CsvRelation(<function0>,Some(/tmp/r0),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
 StructField(b,StringType,true)),false,null,,null)

== Physical Plan ==
*(1) Filter (a#1 <= Subquery subquery0)
:  +- Subquery subquery0
:     +- *(2) HashAggregate(keys=[], functions=[avg(a#7)], output=[avg(a)#14])
:        +- Exchange SinglePartition
:           +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#7)], 
output=[sum#19, count#20L])
:              +- *(1) Scan 
CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
 StructField(b,StringType,true)),false,null,,null) [a#7] PushedFilters: [], 
ReadSchema: struct<a:double>
+- *(1) Scan 
CsvRelation(<function0>,Some(/tmp/r0),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
 StructField(b,StringType,true)),false,null,,null) [a#1,b#2] PushedFilters: [], 
ReadSchema: struct<a:double,b:string>
{noformat}

  was:
Test Query:  SELECT a,b FROM r0 WHERE a <= (SELECT AVG(a) from r1) LIMIT 5000

r0,r1 have schema \{a:Double, b:String}

IF r0 is a CsvRelation than no problem, but if it is a ParquetRelation then the 
table scan is performed twice and the AVG(a) is simply discarded in one branch. 
This appears to have something to do with pushing the filter down into the 
table scan.

Breakpoints in ScalarSubquery's methods updateResult(), eval(), and doGenCode() 
seems sufficient to demonstrate that one of the two computed scalars is never 
used.

The duplicated ScalarSubquery can be seen in this (physical) plan:
{noformat}
== Parsed Logical Plan ==
'Project ['a, 'b]
+- 'Filter ('a <= scalar-subquery#8 [])
   :  +- 'Project [unresolvedalias('AVG('a), None)]
   :     +- 'UnresolvedRelation `r1`
   +- 'UnresolvedRelation `r0`

== Analyzed Logical Plan ==
a: double, b: string
Project [a#9, b#10]
+- Filter (a#9 <= scalar-subquery#8 [])
   :  +- Aggregate [avg(a#15) AS avg(a)#22]
   :     +- SubqueryAlias `r1`
   :        +- Project [a#15, b#16]
   :           +- Relation[a#15,b#16] 
CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
 StructField(b,StringType,true)),false,null,,null)
   +- SubqueryAlias `r0`
      +- Project [a#9, b#10]
         +- Relation[a#9,b#10] parquet

== Optimized Logical Plan ==
Filter (isnotnull(a#9) && (a#9 <= scalar-subquery#8 []))
:  +- Aggregate [avg(a#15) AS avg(a)#22]
:     +- Project [a#15]
:        +- Relation[a#15,b#16] 
CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
 StructField(b,StringType,true)),false,null,,null)
+- Relation[a#9,b#10] parquet

== Physical Plan ==
*(1) Project [a#9, b#10]
+- *(1) Filter (isnotnull(a#9) && (a#9 <= Subquery subquery8))
   :  +- Subquery subquery8
   :     +- *(2) HashAggregate(keys=[], functions=[avg(a#15)], 
output=[avg(a)#22])
   :        +- Exchange SinglePartition
   :           +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#15)], 
output=[sum#27, count#28L])
   :              +- *(1) Scan 
CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
 StructField(b,StringType,true)),false,null,,null) [a#15] PushedFilters: [], 
ReadSchema: struct<a:double>
   +- *(1) FileScan parquet [a#9,b#10] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/tmp/r0], PartitionFilters: [], PushedFilters: 
[IsNotNull(a)], ReadSchema: struct<a:double,b:string>
         +- Subquery subquery8
            +- *(2) HashAggregate(keys=[], functions=[avg(a#15)], 
output=[avg(a)#22])
               +- Exchange SinglePartition
                  +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#15)], 
output=[sum#27, count#28L])
                     +- *(1) Scan 
CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
 StructField(b,StringType,true)),false,null,,null) [a#15] PushedFilters: [], 
ReadSchema: struct<a:double>
{noformat}
Whereas when I change r0 to be a CsvRelation:
{noformat}
== Parsed Logical Plan ==
'Project ['a, 'b]
+- 'Filter ('a <= scalar-subquery#0 [])
   :  +- 'Project [unresolvedalias('AVG('a), None)]
   :     +- 'UnresolvedRelation `r1`
   +- 'UnresolvedRelation `r0`

== Analyzed Logical Plan ==
a: double, b: string
Project [a#1, b#2]
+- Filter (a#1 <= scalar-subquery#0 [])
   :  +- Aggregate [avg(a#7) AS avg(a)#14]
   :     +- SubqueryAlias `r1`
   :        +- Project [a#7, b#8]
   :           +- Relation[a#7,b#8] 
CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
 StructField(b,StringType,true)),false,null,,null)
   +- SubqueryAlias `r0`
      +- Project [a#1, b#2]
         +- Relation[a#1,b#2] 
CsvRelation(<function0>,Some(/tmp/r0),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
 StructField(b,StringType,true)),false,null,,null)

== Optimized Logical Plan ==
Filter (a#1 <= scalar-subquery#0 [])
:  +- Aggregate [avg(a#7) AS avg(a)#14]
:     +- Project [a#7]
:        +- Relation[a#7,b#8] 
CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
 StructField(b,StringType,true)),false,null,,null)
+- Relation[a#1,b#2] 
CsvRelation(<function0>,Some(/tmp/r0),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
 StructField(b,StringType,true)),false,null,,null)

== Physical Plan ==
*(1) Filter (a#1 <= Subquery subquery0)
:  +- Subquery subquery0
:     +- *(2) HashAggregate(keys=[], functions=[avg(a#7)], output=[avg(a)#14])
:        +- Exchange SinglePartition
:           +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#7)], 
output=[sum#19, count#20L])
:              +- *(1) Scan 
CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
 StructField(b,StringType,true)),false,null,,null) [a#7] PushedFilters: [], 
ReadSchema: struct<a:double>
+- *(1) Scan 
CsvRelation(<function0>,Some(/tmp/r0),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
 StructField(b,StringType,true)),false,null,,null) [a#1,b#2] PushedFilters: [], 
ReadSchema: struct<a:double,b:string>
{noformat}


> Filter Pushdown duplicates expensive ScalarSubquery (discarding result)
> -----------------------------------------------------------------------
>
>                 Key: SPARK-27332
>                 URL: https://issues.apache.org/jira/browse/SPARK-27332
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 2.4.0
>            Reporter: David Stewart Zink
>            Priority: Major
>
> Test Query:  SELECT a,b FROM r0 WHERE a <= (SELECT AVG(a) from r1) LIMIT 5000
> r0,r1 have schema \{a:Double, b:String}
> IF r0 is a CsvRelation than no problem, but if it is a ParquetRelation then 
> the table scan is performed twice and the AVG(a) is simply discarded in one 
> branch. This appears to have something to do with pushing the filter down 
> into the table scan.
> Breakpoints in org.apache.spark.sql.execution.ScalarSubquery's methods 
> updateResult(), eval(), and doGenCode() seems sufficient to demonstrate that 
> one of the two computed scalars is never used.
> The duplicated ScalarSubquery can be seen in this (physical) plan:
> {noformat}
> == Parsed Logical Plan ==
> 'Project ['a, 'b]
> +- 'Filter ('a <= scalar-subquery#8 [])
>    :  +- 'Project [unresolvedalias('AVG('a), None)]
>    :     +- 'UnresolvedRelation `r1`
>    +- 'UnresolvedRelation `r0`
> == Analyzed Logical Plan ==
> a: double, b: string
> Project [a#9, b#10]
> +- Filter (a#9 <= scalar-subquery#8 [])
>    :  +- Aggregate [avg(a#15) AS avg(a)#22]
>    :     +- SubqueryAlias `r1`
>    :        +- Project [a#15, b#16]
>    :           +- Relation[a#15,b#16] 
> CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null)
>    +- SubqueryAlias `r0`
>       +- Project [a#9, b#10]
>          +- Relation[a#9,b#10] parquet
> == Optimized Logical Plan ==
> Filter (isnotnull(a#9) && (a#9 <= scalar-subquery#8 []))
> :  +- Aggregate [avg(a#15) AS avg(a)#22]
> :     +- Project [a#15]
> :        +- Relation[a#15,b#16] 
> CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null)
> +- Relation[a#9,b#10] parquet
> == Physical Plan ==
> *(1) Project [a#9, b#10]
> +- *(1) Filter (isnotnull(a#9) && (a#9 <= Subquery subquery8))
>    :  +- Subquery subquery8
>    :     +- *(2) HashAggregate(keys=[], functions=[avg(a#15)], 
> output=[avg(a)#22])
>    :        +- Exchange SinglePartition
>    :           +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#15)], 
> output=[sum#27, count#28L])
>    :              +- *(1) Scan 
> CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null) [a#15] PushedFilters: [], 
> ReadSchema: struct<a:double>
>    +- *(1) FileScan parquet [a#9,b#10] Batched: true, Format: Parquet, 
> Location: InMemoryFileIndex[file:/tmp/r0], PartitionFilters: [], 
> PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:double,b:string>
>          +- Subquery subquery8
>             +- *(2) HashAggregate(keys=[], functions=[avg(a#15)], 
> output=[avg(a)#22])
>                +- Exchange SinglePartition
>                   +- *(1) HashAggregate(keys=[], 
> functions=[partial_avg(a#15)], output=[sum#27, count#28L])
>                      +- *(1) Scan 
> CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null) [a#15] PushedFilters: [], 
> ReadSchema: struct<a:double>
> {noformat}
> Whereas when I change r0 to be a CsvRelation:
> {noformat}
> == Parsed Logical Plan ==
> 'Project ['a, 'b]
> +- 'Filter ('a <= scalar-subquery#0 [])
>    :  +- 'Project [unresolvedalias('AVG('a), None)]
>    :     +- 'UnresolvedRelation `r1`
>    +- 'UnresolvedRelation `r0`
> == Analyzed Logical Plan ==
> a: double, b: string
> Project [a#1, b#2]
> +- Filter (a#1 <= scalar-subquery#0 [])
>    :  +- Aggregate [avg(a#7) AS avg(a)#14]
>    :     +- SubqueryAlias `r1`
>    :        +- Project [a#7, b#8]
>    :           +- Relation[a#7,b#8] 
> CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null)
>    +- SubqueryAlias `r0`
>       +- Project [a#1, b#2]
>          +- Relation[a#1,b#2] 
> CsvRelation(<function0>,Some(/tmp/r0),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null)
> == Optimized Logical Plan ==
> Filter (a#1 <= scalar-subquery#0 [])
> :  +- Aggregate [avg(a#7) AS avg(a)#14]
> :     +- Project [a#7]
> :        +- Relation[a#7,b#8] 
> CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null)
> +- Relation[a#1,b#2] 
> CsvRelation(<function0>,Some(/tmp/r0),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null)
> == Physical Plan ==
> *(1) Filter (a#1 <= Subquery subquery0)
> :  +- Subquery subquery0
> :     +- *(2) HashAggregate(keys=[], functions=[avg(a#7)], output=[avg(a)#14])
> :        +- Exchange SinglePartition
> :           +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#7)], 
> output=[sum#19, count#20L])
> :              +- *(1) Scan 
> CsvRelation(<function0>,Some(/tmp/r1),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null) [a#7] PushedFilters: [], 
> ReadSchema: struct<a:double>
> +- *(1) Scan 
> CsvRelation(<function0>,Some(/tmp/r0),true,,,",\,#,PERMISSIVE,COMMONS,false,false,true,StructType(StructField(a,DoubleType,false),
>  StructField(b,StringType,true)),false,null,,null) [a#1,b#2] PushedFilters: 
> [], ReadSchema: struct<a:double,b:string>
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to