[ 
https://issues.apache.org/jira/browse/SPARK-13136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reynold Xin updated SPARK-13136:
--------------------------------
    Description: 
In an ideal architecture, we have a very small number of physical operators 
that handle data exchanges, and the rest simply declare the input data 
distribution needed and let the planner inject the right Exchange operators.

We have almost that, except the following few operators:

1. Limit: does its own shuffle or collect to get data to a single partition.
2. Except: does its own shuffle; note that this operator is going away and will 
be replaced by anti-join (SPARK-12660).
3. broadcast joins: broadcast joins do its own broadcast, which is a form of 
data exchange.

Here are a straw man for limit. Split the current Limit operator into two: a 
partition-local limit and a terminal limit. Partition-local limit is just a 
normal unary operator. The terminal limit requires the input data distribution 
to be a single partition, and then takes its own limit. We then update the 
planner (strategies) to turn a logical limit into a partition local limit and a 
terminal limit.

For broadcast join, it is more involved. We would need to design the interface 
for the physical operators (e.g. we are no longer taking an iterator as input 
on the probe side), and allow Exchange to handle data broadcast.

Note that this is an important step towards creating a clear delineation 
between distributed query execution and single-threaded query execution.


  was:
In an ideal architecture, we have a very small number of physical operators 
that handle data exchanges, and the rest simply declare the input data 
distribution needed and let the planner inject the right Exchange operators.

We have almost that, except the following few operators:

1. Limit: does its own shuffle or collect to get data to a single partition.
2. Except: does its own shuffle; note that this operator is going away and will 
be replaced by anti-join (SPARK-12660).
3. broadcast joins: broadcast joins do its own broadcast, which is a form of 
data exchange.

Here are a straw man for limit. Split the current Limit operator into two: a 
partition-local limit and a terminal limit. Partition-local limit is just a 
normal unary operator. The terminal limit requires the input data distribution 
to be a single partition, and then takes its own limit. We then update the 
planner (strategies) to turn a logical limit into a partition local limit and a 
terminal limit.

For broadcast join, it is more involved. We would need to design the interface 
for the physical operators (e.g. we are no longer taking an iterator as input 
on the probe side), and allow Exchange to handle data broadcast.



> Data exchange (shuffle, broadcast) should only be handled by the exchange 
> operator
> ----------------------------------------------------------------------------------
>
>                 Key: SPARK-13136
>                 URL: https://issues.apache.org/jira/browse/SPARK-13136
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>            Reporter: Reynold Xin
>
> In an ideal architecture, we have a very small number of physical operators 
> that handle data exchanges, and the rest simply declare the input data 
> distribution needed and let the planner inject the right Exchange operators.
> We have almost that, except the following few operators:
> 1. Limit: does its own shuffle or collect to get data to a single partition.
> 2. Except: does its own shuffle; note that this operator is going away and 
> will be replaced by anti-join (SPARK-12660).
> 3. broadcast joins: broadcast joins do its own broadcast, which is a form of 
> data exchange.
> Here are a straw man for limit. Split the current Limit operator into two: a 
> partition-local limit and a terminal limit. Partition-local limit is just a 
> normal unary operator. The terminal limit requires the input data 
> distribution to be a single partition, and then takes its own limit. We then 
> update the planner (strategies) to turn a logical limit into a partition 
> local limit and a terminal limit.
> For broadcast join, it is more involved. We would need to design the 
> interface for the physical operators (e.g. we are no longer taking an 
> iterator as input on the probe side), and allow Exchange to handle data 
> broadcast.
> Note that this is an important step towards creating a clear delineation 
> between distributed query execution and single-threaded query execution.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to