[ 
https://issues.apache.org/jira/browse/SPARK-38386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alfred Xu updated SPARK-38386:
------------------------------
    Description: 
The idea of this issue is originated from 
[https://github.com/NVIDIA/spark-rapids/issues/4186]

Currently, Spark SQL executes each incorrelated scalar subquery as an 
independent spark job. It generates a lot of spark jobs when we run a query 
with a lot of incorrelated scalar subqueries. Scenarios like this can be 
optimized in terms of logcial plan. We can combine subquery plans of compatible 
scalar subqueries into fused subquery plans. And let them shared by multiple 
scalar subqueries. With combining compatible scalar subqueries, we can cut off 
the cost of subquery jobs, because common parts of compatible subquery plans 
(scans/filters) will be reused.

 

Here is an example to demonstrate the basic idea of combining compatible scalar 
subqueries:
{code:java}
SELECT SUM(i)
FROM t
WHERE l > (SELECT MIN(l2) FROM t)
AND l2 < (SELECT MAX(l) FROM t)
AND i2 <> (SELECT MAX(i2) FROM t)
AND i2 <> (SELECT MIN(i2) FROM t) {code}
Optimized logicial plan of above query looks like:
{code:java}
Aggregate [sum(i)]
+- Project [i]
  +- Filter (((l > scalar-subquery#1) AND (l2 < scalar-subquery#2)) AND (NOT 
(i2 = scalar-subquery#3) AND NOT (i2 = scalar-subquery#4)))
     :  :- Aggregate [min(l2)]
     :  :  +- Project [l2]
     :  :     +- Relation [l,l2,i,i2]
     :  +- Aggregate [max(l)]
     :     +- Project [l]
     :        +- Relation [l,l2,i,i2]
     :  +- Aggregate [max(i2)]
     :     +- Project [l]
     :        +- Relation [l,l2,i,i2]
     :  +- Aggregate [min(i2)]
     :     +- Project [l]
     :        +- Relation [l,l2,i,i2]
     +- Relation [l,l2,i,i2] {code}
After the combination of compatible scalar subqueries, the logicial plan 
becomes:
{code:java}
 Aggregate [sum(i)]
 +- Project [i]
   +- Filter (((l > shared-scalar-subquery#1) AND (l2 < 
shared-scalar-subquery#2)) AND (NOT (i2 = shared-scalar-subquery#3) AND NOT (i2 
= shared-scalar-subquery#4)))
      :  :- Aggregate [min(l2),max(l),max(i2),min(i2)]
      :  :  +- Project [l2,l,i2]
      :  :     +- Relation [l,l2,i,i2]
      :  :- Aggregate [min(l2),max(l),max(i2),min(i2)]
      :  :  +- Project [l2,l,i2]
      :        +- Relation [l,l2,i,i2]
      :  :- Aggregate [min(l2),max(l),max(i2),min(i2)]
      :  :  +- Project [l2,l,i2]
      :        +- Relation [l,l2,i,i2]
      :  :- Aggregate [min(l2),max(l),max(i2),min(i2)]
      :  :  +- Project [l2,l,i2]
      :        +- Relation [l,l2,i,i2]
      +- Relation [l,l2,i,i2] {code}
 

There are 4 scalar subqueries within this query. Although they are semantically 
unequal, they are based on the same relation. Therefore, we can merge all of 
them into an unified Aggregate to resue the common scan(relation).

 

 

  was:
The idea of this issue is originated from 
[https://github.com/NVIDIA/spark-rapids/issues/4186]

Currently, Spark SQL executes each incorrelated scalar subquery as an 
independent spark job. It generates a lot of spark jobs when we run a query 
with a lot of incorrelated scalar subqueries. Scenarios like this can be 
optimized in terms of logcial plan. We can combine subquery plans of compatible 
scalar subqueries into fused subquery plans. And let them shared by multiple 
scalar subqueries. With combining compatible scalar subqueries, we can cut off 
the cost of subquery jobs, because common parts of compatible subquery plans 
(scans/filters) will be reused.

 

Here is an example to demonstrate the basic idea of combining compatible scalar 
subqueries:

 
{code:java}
SELECT SUM(i)
FROM t
WHERE l > (SELECT MIN(l2) FROM t)
AND l2 < (SELECT MAX(l) FROM t)
AND i2 <> (SELECT MAX(i2) FROM t)
AND i2 <> (SELECT MIN(i2) FROM t) {code}
 

 

Optimized logicial plan of above query looks like:
{code:java}
Aggregate [sum(i)]
+- Project [i]
  +- Filter (((l > scalar-subquery#1) AND (l2 < scalar-subquery#2)) AND (NOT 
(i2 = scalar-subquery#3) AND NOT (i2 = scalar-subquery#4)))
     :  :- Aggregate [min(l2)]
     :  :  +- Project [l2]
     :  :     +- Relation [l,l2,i,i2]
     :  +- Aggregate [max(l)]
     :     +- Project [l]
     :        +- Relation [l,l2,i,i2]
     :  +- Aggregate [max(i2)]
     :     +- Project [l]
     :        +- Relation [l,l2,i,i2]
     :  +- Aggregate [min(i2)]
     :     +- Project [l]
     :        +- Relation [l,l2,i,i2]
     +- Relation [l,l2,i,i2] {code}
 

After the combination of compatible scalar subqueries, the logicial plan 
becomes:
{code:java}
 Aggregate [sum(i)]
 +- Project [i]
   +- Filter (((l > shared-scalar-subquery#1) AND (l2 < 
shared-scalar-subquery#2)) AND (NOT (i2 = shared-scalar-subquery#3) AND NOT (i2 
= shared-scalar-subquery#4)))
      :  :- Aggregate [min(l2),max(l),max(i2),min(i2)]
      :  :  +- Project [l2,l,i2]
      :  :     +- Relation [l,l2,i,i2]
      :  :- Aggregate [min(l2),max(l),max(i2),min(i2)]
      :  :  +- Project [l2,l,i2]
      :        +- Relation [l,l2,i,i2]
      :  :- Aggregate [min(l2),max(l),max(i2),min(i2)]
      :  :  +- Project [l2,l,i2]
      :        +- Relation [l,l2,i,i2]
      :  :- Aggregate [min(l2),max(l),max(i2),min(i2)]
      :  :  +- Project [l2,l,i2]
      :        +- Relation [l,l2,i,i2]
      +- Relation [l,l2,i,i2] {code}
 

There are 4 scalar subqueries within this query. Although they are semantically 
unequal, they are based on the same relation. Therefore, we can merge all of 
them into an unified Aggregate to resue the common scan(relation).

 

 


> Combine compatible scalar subqueries
> ------------------------------------
>
>                 Key: SPARK-38386
>                 URL: https://issues.apache.org/jira/browse/SPARK-38386
>             Project: Spark
>          Issue Type: Improvement
>          Components: Optimizer
>    Affects Versions: 3.3.0
>            Reporter: Alfred Xu
>            Priority: Minor
>
> The idea of this issue is originated from 
> [https://github.com/NVIDIA/spark-rapids/issues/4186]
> Currently, Spark SQL executes each incorrelated scalar subquery as an 
> independent spark job. It generates a lot of spark jobs when we run a query 
> with a lot of incorrelated scalar subqueries. Scenarios like this can be 
> optimized in terms of logcial plan. We can combine subquery plans of 
> compatible scalar subqueries into fused subquery plans. And let them shared 
> by multiple scalar subqueries. With combining compatible scalar subqueries, 
> we can cut off the cost of subquery jobs, because common parts of compatible 
> subquery plans (scans/filters) will be reused.
>  
> Here is an example to demonstrate the basic idea of combining compatible 
> scalar subqueries:
> {code:java}
> SELECT SUM(i)
> FROM t
> WHERE l > (SELECT MIN(l2) FROM t)
> AND l2 < (SELECT MAX(l) FROM t)
> AND i2 <> (SELECT MAX(i2) FROM t)
> AND i2 <> (SELECT MIN(i2) FROM t) {code}
> Optimized logicial plan of above query looks like:
> {code:java}
> Aggregate [sum(i)]
> +- Project [i]
>   +- Filter (((l > scalar-subquery#1) AND (l2 < scalar-subquery#2)) AND (NOT 
> (i2 = scalar-subquery#3) AND NOT (i2 = scalar-subquery#4)))
>      :  :- Aggregate [min(l2)]
>      :  :  +- Project [l2]
>      :  :     +- Relation [l,l2,i,i2]
>      :  +- Aggregate [max(l)]
>      :     +- Project [l]
>      :        +- Relation [l,l2,i,i2]
>      :  +- Aggregate [max(i2)]
>      :     +- Project [l]
>      :        +- Relation [l,l2,i,i2]
>      :  +- Aggregate [min(i2)]
>      :     +- Project [l]
>      :        +- Relation [l,l2,i,i2]
>      +- Relation [l,l2,i,i2] {code}
> After the combination of compatible scalar subqueries, the logicial plan 
> becomes:
> {code:java}
>  Aggregate [sum(i)]
>  +- Project [i]
>    +- Filter (((l > shared-scalar-subquery#1) AND (l2 < 
> shared-scalar-subquery#2)) AND (NOT (i2 = shared-scalar-subquery#3) AND NOT 
> (i2 = shared-scalar-subquery#4)))
>       :  :- Aggregate [min(l2),max(l),max(i2),min(i2)]
>       :  :  +- Project [l2,l,i2]
>       :  :     +- Relation [l,l2,i,i2]
>       :  :- Aggregate [min(l2),max(l),max(i2),min(i2)]
>       :  :  +- Project [l2,l,i2]
>       :        +- Relation [l,l2,i,i2]
>       :  :- Aggregate [min(l2),max(l),max(i2),min(i2)]
>       :  :  +- Project [l2,l,i2]
>       :        +- Relation [l,l2,i,i2]
>       :  :- Aggregate [min(l2),max(l),max(i2),min(i2)]
>       :  :  +- Project [l2,l,i2]
>       :        +- Relation [l,l2,i,i2]
>       +- Relation [l,l2,i,i2] {code}
>  
> There are 4 scalar subqueries within this query. Although they are 
> semantically unequal, they are based on the same relation. Therefore, we can 
> merge all of them into an unified Aggregate to resue the common 
> scan(relation).
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to