[ 
https://issues.apache.org/jira/browse/SPARK-11580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yadong Qi updated SPARK-11580:
------------------------------
    Description: 
I do the SQL as below:
{code}
cache table src as select * from src distribute by key;
select key, count(value) from src group by key;
{code}
and the Physical Plan is 
{code}
TungstenAggregate(key=[key#0], 
functions=[(count(value#1),mode=Final,isDistinct=false)], 
output=[key#0,_c1#28L])
 TungstenAggregate(key=[key#0], 
functions=[(count(value#1),mode=Partial,isDistinct=false)], 
output=[key#0,currentCount#41L])
  InMemoryColumnarTableScan [key#0,value#1], (InMemoryRelation [key#0,value#1], 
true, 10000, StorageLevel(true, true, false, true, 1), (TungstenExchange 
hashpartitioning(key#0)), Some(src))
{code}
I think if there is no *Exchange*, just do final aggregation is better, like:
{code}
TungstenAggregate(key=[key#0], 
functions=[(count(value#1),mode=Final,isDistinct=false)], 
output=[key#0,_c1#28L])
  InMemoryColumnarTableScan [key#0,value#1], (InMemoryRelation [key#0,value#1], 
true, 10000, StorageLevel(true, true, false, true, 1), (TungstenExchange 
hashpartitioning(key#0)), Some(src))
{code}

  was:
I do the SQL as below:
{code}
cache table src as select * from src distribute by key;
select key, count(value) from src group by key;
{code}
and the Physical Plan is 
{code}
TungstenAggregate(key=[key#0], 
functions=[(count(value#1),mode=Final,isDistinct=false)], 
output=[key#0,_c1#28L])
 TungstenAggregate(key=[key#0], 
functions=[(count(value#1),mode=Partial,isDistinct=false)], 
output=[key#0,currentCount#41L])
  InMemoryColumnarTableScan [key#0,value#1], (InMemoryRelation [key#0,value#1], 
true, 10000, StorageLevel(true, true, false, true, 1), (TungstenExchange 
hashpartitioning(key#0)), Some(src))
{code}
I think if there is no Exchange, just do final aggregation is better, like:
{code}
TungstenAggregate(key=[key#0], 
functions=[(count(value#1),mode=Final,isDistinct=false)], 
output=[key#0,_c1#28L])
  InMemoryColumnarTableScan [key#0,value#1], (InMemoryRelation [key#0,value#1], 
true, 10000, StorageLevel(true, true, false, true, 1), (TungstenExchange 
hashpartitioning(key#0)), Some(src))
{code}


> Just do final aggregation when there is no Exchange
> ---------------------------------------------------
>
>                 Key: SPARK-11580
>                 URL: https://issues.apache.org/jira/browse/SPARK-11580
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 1.5.1
>            Reporter: Yadong Qi
>
> I do the SQL as below:
> {code}
> cache table src as select * from src distribute by key;
> select key, count(value) from src group by key;
> {code}
> and the Physical Plan is 
> {code}
> TungstenAggregate(key=[key#0], 
> functions=[(count(value#1),mode=Final,isDistinct=false)], 
> output=[key#0,_c1#28L])
>  TungstenAggregate(key=[key#0], 
> functions=[(count(value#1),mode=Partial,isDistinct=false)], 
> output=[key#0,currentCount#41L])
>   InMemoryColumnarTableScan [key#0,value#1], (InMemoryRelation 
> [key#0,value#1], true, 10000, StorageLevel(true, true, false, true, 1), 
> (TungstenExchange hashpartitioning(key#0)), Some(src))
> {code}
> I think if there is no *Exchange*, just do final aggregation is better, like:
> {code}
> TungstenAggregate(key=[key#0], 
> functions=[(count(value#1),mode=Final,isDistinct=false)], 
> output=[key#0,_c1#28L])
>   InMemoryColumnarTableScan [key#0,value#1], (InMemoryRelation 
> [key#0,value#1], true, 10000, StorageLevel(true, true, false, true, 1), 
> (TungstenExchange hashpartitioning(key#0)), Some(src))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to