[ 
https://issues.apache.org/jira/browse/SPARK-14097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gaurav Tiwari updated SPARK-14097:
----------------------------------
    Description: 
I am trying to execute a simple query with join on 3 tables. When I look at the 
execution plan , it varies with position of table in the "from" clause of the 
query. Execution plan looks more optimized when the position of table with 
predicates is specified before any other table. 

a) Original query : 

select distinct pge.portfolio_code 
from table1 pge join table2 p 
on p.perm_group = pge.anc_port_group 
join table3 uge 
on p.user_group=uge.anc_user_group 
where uge.user_name = 'user' and p.perm_type = 'TEST' 


b) Optimized query (table with predicates is moved ahead): 
select distinct pge.portfolio_code from table3 uge, table2 p, table1 pge where 
uge.user_name = 'user' and p.perm_type = 'TEST'  and p.perm_group = 
pge.anc_port_group  and p.user_group=uge.anc_user_group

1) Execution Plan for Original query (a):
== Parsed Logical Plan ==
'Distinct
 'Project [unresolvedalias('pge.portfolio_code)]
  'Filter (('uge.user_name = user) && ('p.perm_type = TEST))
   'Join Inner, Some(('p.user_group = 'uge.anc_user_group))
    'Join Inner, Some(('p.perm_group = 'pge.anc_port_group))
     'UnresolvedRelation [table1], Some(pge)
     'UnresolvedRelation [table2], Some(p)
    'UnresolvedRelation [table3], Some(uge)

== Analyzed Logical Plan ==
portfolio_code: string
Distinct
 Project [portfolio_code#7]
  Filter ((user_name#12 = user) && (perm_type#9 = TEST))
   Join Inner, Some((user_group#8 = anc_user_group#11))
    Join Inner, Some((perm_group#10 = anc_port_group#5))
     Subquery pge
      Relation[anc_port_group#5,portfolio_name#6,portfolio_code#7] 
ParquetRelation[snackfs://tst:9042/aladdin_data_beta/table1.parquet]
     Subquery p
      Relation[user_group#8,perm_type#9,perm_group#10] 
ParquetRelation[snackfs://tst:9042/aladdin_data_beta/table2.parquet]
    Subquery uge
     Relation[anc_user_group#11,user_name#12] 
ParquetRelation[snackfs://tst:9042/aladdin_data_beta/table3.parquet]

== Optimized Logical Plan ==
Aggregate [portfolio_code#7], [portfolio_code#7]
 Project [portfolio_code#7]
  Join Inner, Some((user_group#8 = anc_user_group#11))
   Project [portfolio_code#7,user_group#8]
    Join Inner, Some((perm_group#10 = anc_port_group#5))
     Project [portfolio_code#7,anc_port_group#5]
      Relation[anc_port_group#5,portfolio_name#6,portfolio_code#7] 
ParquetRelation[snackfs://tst:9042/aladdin_data_beta/table1.parquet]
     Project [user_group#8,perm_group#10]
      Filter (perm_type#9 = TEST)
       Relation[user_group#8,perm_type#9,perm_group#10] 
ParquetRelation[snackfs://tst:9042/aladdin_data_beta/table2.parquet]
   Project [anc_user_group#11]
    Filter (user_name#12 = user)
     Relation[anc_user_group#11,user_name#12] 
ParquetRelation[snackfs://tst:9042/aladdin_data_beta/table3.parquet]

== Physical Plan ==
TungstenAggregate(key=[portfolio_code#7], functions=[], 
output=[portfolio_code#7])
 TungstenExchange hashpartitioning(portfolio_code#7)
  TungstenAggregate(key=[portfolio_code#7], functions=[], 
output=[portfolio_code#7])
   TungstenProject [portfolio_code#7]
    BroadcastHashJoin [user_group#8], [anc_user_group#11], BuildRight
     TungstenProject [portfolio_code#7,user_group#8]
      BroadcastHashJoin [anc_port_group#5], [perm_group#10], BuildRight
       ConvertToUnsafe
        Scan 
ParquetRelation[snackfs://tst:9042/aladdin_data_beta/table1.parquet][portfolio_code#7,anc_port_group#5]
       ConvertToUnsafe
        Project [user_group#8,perm_group#10]
         Filter (perm_type#9 = TEST)
          Scan 
ParquetRelation[snackfs://tst:9042/aladdin_data_beta/table2.parquet][user_group#8,perm_group#10,perm_type#9]
     ConvertToUnsafe
      Project [anc_user_group#11]
       Filter (user_name#12 = user)
        Scan 
ParquetRelation[snackfs://tst:9042/aladdin_data_beta/table3.parquet][anc_user_group#11,user_name#12]

Code Generation: true

2) Execution Plan for  Optimized query (b):
== Physical Plan == 
TungstenAggregate(key=[portfolio_code#14119], functions=[], 
output=[portfolio_code#14119]) 
 TungstenExchange hashpartitioning(portfolio_code#14119) 
  TungstenAggregate(key=[portfolio_code#14119], functions=[], 
output=[portfolio_code#14119]) 
   TungstenProject [portfolio_code#14119] 
    BroadcastHashJoin [perm_group#13667], [anc_port_group#14117], BuildRight 
     TungstenProject [perm_group#13667] 
      BroadcastHashJoin [anc_user_group#13658], [user_group#13665], BuildRight 
       ConvertToUnsafe 
        Project [anc_user_group#13658] 
         Filter (user_name#13659 = user) 
          Scan 
ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table3.parquet][anc_user_group#13658,user_name#13659]
 
       ConvertToUnsafe 
        Project [perm_group#13667,user_group#13665] 
         Filter (perm_type#13666 = TEST) 
          Scan 
ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table2.parquet][perm_group#13667,user_group#13665,perm_type#13666]
 
     ConvertToUnsafe 
      Scan 
ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table1.parquet][portfolio_code#14119,anc_port_group#14117]
 



  was:
I am trying to execute a simple query with join on 3 tables. When I look at the 
execution plan , it varies with position of table in the "from" clause of the 
query. Execution plan looks more optimized when the position of table with 
predicates is specified before any other table. 

Original query : 

select distinct pge.portfolio_code 
from table1 pge join table2 p 
on p.perm_group = pge.anc_port_group 
join table3 uge 
on p.user_group=uge.anc_user_group 
where uge.user_name = 'user' and p.perm_type = 'TEST' 

Execution Plan for original query:

== Physical Plan == 
TungstenAggregate(key=[portfolio_code#14119], functions=[], 
output=[portfolio_code#14119]) 
 TungstenExchange hashpartitioning(portfolio_code#14119) 
  TungstenAggregate(key=[portfolio_code#14119], functions=[], 
output=[portfolio_code#14119]) 
   TungstenProject [portfolio_code#14119] 
    BroadcastHashJoin [user_group#13665], [anc_user_group#13658], BuildRight 
     TungstenProject [portfolio_code#14119,user_group#13665] 
      BroadcastHashJoin [anc_port_group#14117], [perm_group#13667], BuildRight 
       ConvertToUnsafe 
        Scan 
ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table1.parquet][portfolio_code#14119,anc_port_group#14117]
 
       ConvertToUnsafe 
        Project [user_group#13665,perm_group#13667] 
         Filter (perm_type#13666 = TEST) 
          Scan 
ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table2.parquet][user_group#13665,perm_group#13667,perm_type#13666]
 
     ConvertToUnsafe 
      Project [anc_user_group#13658] 
       Filter (user_name#13659 = user) 
        Scan 
ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table3.parquet][anc_user_group#13658,user_name#13659]
 

Optimized query (table with predicates is moved ahead): 
select distinct pge.portfolio_code 
from table1 uge, table2 p, table3 pge 
where uge.user_name = 'user' and p.perm_type = 'TEST' 
and p.perm_group = pge.anc_port_group 
and p.user_group=uge.anc_user_group 

Execution Plan for  optimized query:
== Physical Plan == 
TungstenAggregate(key=[portfolio_code#14119], functions=[], 
output=[portfolio_code#14119]) 
 TungstenExchange hashpartitioning(portfolio_code#14119) 
  TungstenAggregate(key=[portfolio_code#14119], functions=[], 
output=[portfolio_code#14119]) 
   TungstenProject [portfolio_code#14119] 
    BroadcastHashJoin [perm_group#13667], [anc_port_group#14117], BuildRight 
     TungstenProject [perm_group#13667] 
      BroadcastHashJoin [anc_user_group#13658], [user_group#13665], BuildRight 
       ConvertToUnsafe 
        Project [anc_user_group#13658] 
         Filter (user_name#13659 = user) 
          Scan 
ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table3.parquet][anc_user_group#13658,user_name#13659]
 
       ConvertToUnsafe 
        Project [perm_group#13667,user_group#13665] 
         Filter (perm_type#13666 = TEST) 
          Scan 
ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table2.parquet][perm_group#13667,user_group#13665,perm_type#13666]
 
     ConvertToUnsafe 
      Scan 
ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table1.parquet][portfolio_code#14119,anc_port_group#14117]
 




> Spark SQL Optimization is not consistent
> ----------------------------------------
>
>                 Key: SPARK-14097
>                 URL: https://issues.apache.org/jira/browse/SPARK-14097
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.5.2, 1.6.1
>            Reporter: Gaurav Tiwari
>            Priority: Minor
>
> I am trying to execute a simple query with join on 3 tables. When I look at 
> the execution plan , it varies with position of table in the "from" clause of 
> the query. Execution plan looks more optimized when the position of table 
> with predicates is specified before any other table. 
> a) Original query : 
> select distinct pge.portfolio_code 
> from table1 pge join table2 p 
> on p.perm_group = pge.anc_port_group 
> join table3 uge 
> on p.user_group=uge.anc_user_group 
> where uge.user_name = 'user' and p.perm_type = 'TEST' 
> b) Optimized query (table with predicates is moved ahead): 
> select distinct pge.portfolio_code from table3 uge, table2 p, table1 pge 
> where uge.user_name = 'user' and p.perm_type = 'TEST'  and p.perm_group = 
> pge.anc_port_group  and p.user_group=uge.anc_user_group
> 1) Execution Plan for Original query (a):
> == Parsed Logical Plan ==
> 'Distinct
>  'Project [unresolvedalias('pge.portfolio_code)]
>   'Filter (('uge.user_name = user) && ('p.perm_type = TEST))
>    'Join Inner, Some(('p.user_group = 'uge.anc_user_group))
>     'Join Inner, Some(('p.perm_group = 'pge.anc_port_group))
>      'UnresolvedRelation [table1], Some(pge)
>      'UnresolvedRelation [table2], Some(p)
>     'UnresolvedRelation [table3], Some(uge)
> == Analyzed Logical Plan ==
> portfolio_code: string
> Distinct
>  Project [portfolio_code#7]
>   Filter ((user_name#12 = user) && (perm_type#9 = TEST))
>    Join Inner, Some((user_group#8 = anc_user_group#11))
>     Join Inner, Some((perm_group#10 = anc_port_group#5))
>      Subquery pge
>       Relation[anc_port_group#5,portfolio_name#6,portfolio_code#7] 
> ParquetRelation[snackfs://tst:9042/aladdin_data_beta/table1.parquet]
>      Subquery p
>       Relation[user_group#8,perm_type#9,perm_group#10] 
> ParquetRelation[snackfs://tst:9042/aladdin_data_beta/table2.parquet]
>     Subquery uge
>      Relation[anc_user_group#11,user_name#12] 
> ParquetRelation[snackfs://tst:9042/aladdin_data_beta/table3.parquet]
> == Optimized Logical Plan ==
> Aggregate [portfolio_code#7], [portfolio_code#7]
>  Project [portfolio_code#7]
>   Join Inner, Some((user_group#8 = anc_user_group#11))
>    Project [portfolio_code#7,user_group#8]
>     Join Inner, Some((perm_group#10 = anc_port_group#5))
>      Project [portfolio_code#7,anc_port_group#5]
>       Relation[anc_port_group#5,portfolio_name#6,portfolio_code#7] 
> ParquetRelation[snackfs://tst:9042/aladdin_data_beta/table1.parquet]
>      Project [user_group#8,perm_group#10]
>       Filter (perm_type#9 = TEST)
>        Relation[user_group#8,perm_type#9,perm_group#10] 
> ParquetRelation[snackfs://tst:9042/aladdin_data_beta/table2.parquet]
>    Project [anc_user_group#11]
>     Filter (user_name#12 = user)
>      Relation[anc_user_group#11,user_name#12] 
> ParquetRelation[snackfs://tst:9042/aladdin_data_beta/table3.parquet]
> == Physical Plan ==
> TungstenAggregate(key=[portfolio_code#7], functions=[], 
> output=[portfolio_code#7])
>  TungstenExchange hashpartitioning(portfolio_code#7)
>   TungstenAggregate(key=[portfolio_code#7], functions=[], 
> output=[portfolio_code#7])
>    TungstenProject [portfolio_code#7]
>     BroadcastHashJoin [user_group#8], [anc_user_group#11], BuildRight
>      TungstenProject [portfolio_code#7,user_group#8]
>       BroadcastHashJoin [anc_port_group#5], [perm_group#10], BuildRight
>        ConvertToUnsafe
>         Scan 
> ParquetRelation[snackfs://tst:9042/aladdin_data_beta/table1.parquet][portfolio_code#7,anc_port_group#5]
>        ConvertToUnsafe
>         Project [user_group#8,perm_group#10]
>          Filter (perm_type#9 = TEST)
>           Scan 
> ParquetRelation[snackfs://tst:9042/aladdin_data_beta/table2.parquet][user_group#8,perm_group#10,perm_type#9]
>      ConvertToUnsafe
>       Project [anc_user_group#11]
>        Filter (user_name#12 = user)
>         Scan 
> ParquetRelation[snackfs://tst:9042/aladdin_data_beta/table3.parquet][anc_user_group#11,user_name#12]
> Code Generation: true
> 2) Execution Plan for  Optimized query (b):
> == Physical Plan == 
> TungstenAggregate(key=[portfolio_code#14119], functions=[], 
> output=[portfolio_code#14119]) 
>  TungstenExchange hashpartitioning(portfolio_code#14119) 
>   TungstenAggregate(key=[portfolio_code#14119], functions=[], 
> output=[portfolio_code#14119]) 
>    TungstenProject [portfolio_code#14119] 
>     BroadcastHashJoin [perm_group#13667], [anc_port_group#14117], BuildRight 
>      TungstenProject [perm_group#13667] 
>       BroadcastHashJoin [anc_user_group#13658], [user_group#13665], 
> BuildRight 
>        ConvertToUnsafe 
>         Project [anc_user_group#13658] 
>          Filter (user_name#13659 = user) 
>           Scan 
> ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table3.parquet][anc_user_group#13658,user_name#13659]
>  
>        ConvertToUnsafe 
>         Project [perm_group#13667,user_group#13665] 
>          Filter (perm_type#13666 = TEST) 
>           Scan 
> ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table2.parquet][perm_group#13667,user_group#13665,perm_type#13666]
>  
>      ConvertToUnsafe 
>       Scan 
> ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table1.parquet][portfolio_code#14119,anc_port_group#14117]
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to