[ 
https://issues.apache.org/jira/browse/ARROW-6659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16948961#comment-16948961
 ] 

Kyle McCarthy commented on ARROW-6659:
--------------------------------------

Does the LogicalPlan need to know about partitioning? I was able to just move 
the logic into the physical query planner and it seems to work... I am not sure 
if this is what you had in mind though. [You can see what I mean 
here|https://github.com/kyle-mccarthy/arrow/blob/3596cf417b8420b51f265c980aace7292c6134d8/rust/datafusion/src/execution/context.rs#L275].

Originally I was thinking about adding a `LogicalPlan::Merge` variant and 
adding some boolean value onto the `LogicalPlan::Aggregate` to indicate if the 
input was partitioned. It seemed like it could just overcomplicate the process 
though. 

> [Rust] [DataFusion] Refactor of HashAggregateExec to support custom merge
> -------------------------------------------------------------------------
>
>                 Key: ARROW-6659
>                 URL: https://issues.apache.org/jira/browse/ARROW-6659
>             Project: Apache Arrow
>          Issue Type: Sub-task
>          Components: Rust, Rust - DataFusion
>            Reporter: Andy Grove
>            Assignee: Kyle McCarthy
>            Priority: Major
>
> HashAggregateExec current creates one HashPartition per input partition for 
> the initial aggregate per partition, and then explicitly calls MergeExec and 
> then creates another HashPartition for the final reduce operation.
> This is fine for in-memory queries in DataFusion but is not extensible. For 
> example, it is not possible to provide a different MergeExec implementation 
> that would distribute queries to a cluster.
> A better design would be to move the logic into the query planner so that the 
> physical plan contains explicit steps such as:
>  
> {code:java}
> - HashAggregate // final aggregate
>   - MergeExec
>     - HashAggregate // aggregate per partition
>  {code}
> This would then make it easier to customize the plan in other projects, to 
> support distributed execution:
> {code:java}
>  - HashAggregate // final aggregate
>    - MergeExec
>       - DistributedExec
>          - HashAggregate // aggregate per partition{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to