[ 
https://issues.apache.org/jira/browse/ARROW-9464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176442#comment-17176442
 ] 

Andy Grove commented on ARROW-9464:
-----------------------------------

I did a slightly better job of explaining this in 
https://issues.apache.org/jira/browse/ARROW-9707

"The current threading model is very simple and does not scale. We currently 
use 1-2 dedicated threads per partition and they all run simultaneously, which 
is a huge problem if you have more partitions than logical or physical cores.
This task is to re-implement the threading model so that query execution uses a 
fixed (configurable) number of threads. Work will be broken down into stages 
and tasks and each in-process executor (running on a dedicated thread) will 
process its queue of tasks.

This process will be driven by a scheduler."

> [Rust] [DataFusion] Physical plan refactor to support optimization rules and 
> more efficient use of threads
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: ARROW-9464
>                 URL: https://issues.apache.org/jira/browse/ARROW-9464
>             Project: Apache Arrow
>          Issue Type: Improvement
>          Components: Rust, Rust - DataFusion
>            Reporter: Andy Grove
>            Assignee: Andy Grove
>            Priority: Major
>
> I would like to propose a refactor of the physical/execution planning based 
> on the experience I have had in implementing distributed execution in 
> Ballista.
> This will likely need subtasks but here is an overview of the changes I am 
> proposing.
> h3. *Introduce enum to represent physical plan.*
> By wrapping the execution plan structs in an enum, we make it possible to 
> build a tree representing the physical plan just like we do with the logical 
> plan. This makes it easy to print physical plans and also to apply 
> transformations to it.
> {code:java}
>  pub enum PhysicalPlan {
>     /// Projection.
>     Projection(Arc<ProjectionExec>),
>     /// Filter a.k.a predicate.
>     Filter(Arc<FilterExec>),
>     /// Hash aggregate
>     HashAggregate(Arc<HashAggregateExec>),
>     /// Performs a hash join of two child relations by first shuffling the 
> data using the join keys.
>     ShuffledHashJoin(ShuffledHashJoinExec),
>     /// Performs a shuffle that will result in the desired partitioning.
>     ShuffleExchange(Arc<ShuffleExchangeExec>),
>     /// Reads results from a ShuffleExchange
>     ShuffleReader(Arc<ShuffleReaderExec>),
>     /// Scans a partitioned data source
>     ParquetScan(Arc<ParquetScanExec>),
>     /// Scans an in-memory table
>     InMemoryTableScan(Arc<InMemoryTableScanExec>),
> }{code}
> h3. *Introduce physical plan optimization rule to insert "shuffle" operators*
> We should extend the ExecutionPlan trait so that each operator can specify 
> its input and output partitioning needs, and then have an optimization rule 
> that can insert any repartioning or reordering steps required.
> For example, these are the methods to be added to ExecutionPlan. This design 
> is based on Apache Spark.
>  
> {code:java}
> /// Specifies how data is partitioned across different nodes in the cluster
> fn output_partitioning(&self) -> Partitioning {
>     Partitioning::UnknownPartitioning(0)
> }
> /// Specifies the data distribution requirements of all the children for this 
> operator
> fn required_child_distribution(&self) -> Distribution {
>     Distribution::UnspecifiedDistribution
> }
> /// Specifies how data is ordered in each partition
> fn output_ordering(&self) -> Option<Vec<SortOrder>> {
>     None
> }
> /// Specifies the data distribution requirements of all the children for this 
> operator
> fn required_child_ordering(&self) -> Option<Vec<Vec<SortOrder>>> {
>     None
> }
>  {code}
> A good example of applying this rule would be in the case of hash aggregates 
> where we perform a partial aggregate in parallel across partitions and then 
> coalesce the results and apply a final hash aggregate.
> Another example would be a SortMergeExec specifying the sort order required 
> for its children.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to