[ 
https://issues.apache.org/jira/browse/ARROW-9464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Grove updated ARROW-9464:
------------------------------
    Description: 
I would like to propose a refactor of the physical/execution planning based on 
the experience I have had in implementing distributed execution in Ballista.

This will likely need subtasks but here is an overview of the changes I am 
proposing.
h3. *Introduce physical plan optimization rule to insert "shuffle" operators*

We should extend the ExecutionPlan trait so that each operator can specify its 
input and output partitioning needs, and then have an optimization rule that 
can insert any repartitioning or reordering steps required.

For example, these are the methods to be added to ExecutionPlan. This design is 
based on Apache Spark.

 
{code:java}
/// Specifies how data is partitioned across different nodes in the cluster
fn output_partitioning(&self) -> Partitioning {
    Partitioning::UnknownPartitioning(0)
}

/// Specifies the data distribution requirements of all the children for this 
operator
fn required_child_distribution(&self) -> Distribution {
    Distribution::UnspecifiedDistribution
}

/// Specifies how data is ordered in each partition
fn output_ordering(&self) -> Option<Vec<SortOrder>> {
    None
}

/// Specifies the data distribution requirements of all the children for this 
operator
fn required_child_ordering(&self) -> Option<Vec<Vec<SortOrder>>> {
    None
}
 {code}
A good example of applying this rule would be in the case of hash aggregates 
where we perform a partial aggregate in parallel across partitions and then 
coalesce the results and apply a final hash aggregate.

Another example would be a SortMergeExec specifying the sort order required for 
its children.

 

 

  was:
I would like to propose a refactor of the physical/execution planning based on 
the experience I have had in implementing distributed execution in Ballista.

This will likely need subtasks but here is an overview of the changes I am 
proposing.
h3. *Introduce enum to represent physical plan.*

By wrapping the execution plan structs in an enum, we make it possible to build 
a tree representing the physical plan just like we do with the logical plan. 
This makes it easy to print physical plans and also to apply transformations to 
it.
{code:java}
 pub enum PhysicalPlan {
    /// Projection.
    Projection(Arc<ProjectionExec>),
    /// Filter a.k.a predicate.
    Filter(Arc<FilterExec>),
    /// Hash aggregate
    HashAggregate(Arc<HashAggregateExec>),
    /// Performs a hash join of two child relations by first shuffling the data 
using the join keys.
    ShuffledHashJoin(ShuffledHashJoinExec),
    /// Performs a shuffle that will result in the desired partitioning.
    ShuffleExchange(Arc<ShuffleExchangeExec>),
    /// Reads results from a ShuffleExchange
    ShuffleReader(Arc<ShuffleReaderExec>),
    /// Scans a partitioned data source
    ParquetScan(Arc<ParquetScanExec>),
    /// Scans an in-memory table
    InMemoryTableScan(Arc<InMemoryTableScanExec>),
}{code}
h3. *Introduce physical plan optimization rule to insert "shuffle" operators*

We should extend the ExecutionPlan trait so that each operator can specify its 
input and output partitioning needs, and then have an optimization rule that 
can insert any repartioning or reordering steps required.

For example, these are the methods to be added to ExecutionPlan. This design is 
based on Apache Spark.

 
{code:java}
/// Specifies how data is partitioned across different nodes in the cluster
fn output_partitioning(&self) -> Partitioning {
    Partitioning::UnknownPartitioning(0)
}

/// Specifies the data distribution requirements of all the children for this 
operator
fn required_child_distribution(&self) -> Distribution {
    Distribution::UnspecifiedDistribution
}

/// Specifies how data is ordered in each partition
fn output_ordering(&self) -> Option<Vec<SortOrder>> {
    None
}

/// Specifies the data distribution requirements of all the children for this 
operator
fn required_child_ordering(&self) -> Option<Vec<Vec<SortOrder>>> {
    None
}
 {code}
A good example of applying this rule would be in the case of hash aggregates 
where we perform a partial aggregate in parallel across partitions and then 
coalesce the results and apply a final hash aggregate.

Another example would be a SortMergeExec specifying the sort order required for 
its children.

 

 


> [Rust] [DataFusion] Physical plan refactor to support optimization rules and 
> more efficient use of threads
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: ARROW-9464
>                 URL: https://issues.apache.org/jira/browse/ARROW-9464
>             Project: Apache Arrow
>          Issue Type: Improvement
>          Components: Rust, Rust - DataFusion
>            Reporter: Andy Grove
>            Assignee: Andy Grove
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 2.0.0
>
>          Time Spent: 3h
>  Remaining Estimate: 0h
>
> I would like to propose a refactor of the physical/execution planning based 
> on the experience I have had in implementing distributed execution in 
> Ballista.
> This will likely need subtasks but here is an overview of the changes I am 
> proposing.
> h3. *Introduce physical plan optimization rule to insert "shuffle" operators*
> We should extend the ExecutionPlan trait so that each operator can specify 
> its input and output partitioning needs, and then have an optimization rule 
> that can insert any repartitioning or reordering steps required.
> For example, these are the methods to be added to ExecutionPlan. This design 
> is based on Apache Spark.
>  
> {code:java}
> /// Specifies how data is partitioned across different nodes in the cluster
> fn output_partitioning(&self) -> Partitioning {
>     Partitioning::UnknownPartitioning(0)
> }
> /// Specifies the data distribution requirements of all the children for this 
> operator
> fn required_child_distribution(&self) -> Distribution {
>     Distribution::UnspecifiedDistribution
> }
> /// Specifies how data is ordered in each partition
> fn output_ordering(&self) -> Option<Vec<SortOrder>> {
>     None
> }
> /// Specifies the data distribution requirements of all the children for this 
> operator
> fn required_child_ordering(&self) -> Option<Vec<Vec<SortOrder>>> {
>     None
> }
>  {code}
> A good example of applying this rule would be in the case of hash aggregates 
> where we perform a partial aggregate in parallel across partitions and then 
> coalesce the results and apply a final hash aggregate.
> Another example would be a SortMergeExec specifying the sort order required 
> for its children.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to