mingmwang opened a new issue, #284:
URL: https://github.com/apache/arrow-ballista/issues/284

   **Is your feature request related to a problem or challenge? Please describe 
what you are trying to do.**
   A clear and concise description of what the problem is. Ex. I'm always 
frustrated when [...] 
   (This section helps Arrow developers understand the context and *why* for 
this feature, in addition to  the *what*)
   
   In current Ballista code base, when it generates the distributed plan, it 
will remove any non-hash repartition from the distributed plan.  
   And In DataFusion, when it does the physical planning, it added 
RepartitionExec node blindly when it sees the hash join or aggregation without 
considering the children's output partitioning.
   
   
   ````
               match repart.output_partitioning() {
                   Partitioning::Hash(_, _) => {
                       let shuffle_writer = create_shuffle_writer(
                           job_id,
                           self.next_stage_id(),
                           children[0].clone(),
                           Some(repart.partitioning().to_owned()),
                       )?;
                       let unresolved_shuffle = 
Arc::new(UnresolvedShuffleExec::new(
                           shuffle_writer.stage_id(),
                           shuffle_writer.schema(),
                           
shuffle_writer.output_partitioning().partition_count(),
                           shuffle_writer
                               .shuffle_output_partitioning()
                               .map(|p| p.partition_count())
                               .unwrap_or_else(|| {
                                   
shuffle_writer.output_partitioning().partition_count()
                               }),
                       ));
                       stages.push(shuffle_writer);
                       Ok((unresolved_shuffle, stages))
                   }
                   _ => {
                       // remove any non-hash repartition from the distributed 
plan
                       Ok((children[0].clone(), stages))
                   }
               }
   
   ````
   
   When I look into Presto's source code,  presto's distributed plan can 
includes both remote exchanges and local exchanges.
   Local exchange can benefit the inner Stage parallelism. Presto can add the 
remote exchanges and local exchanges only when necessary.  I think it is time 
to introduce more advanced methods to reason the partitioning in a distributed 
plan, something more powerful than Spark SQL EnsureRequirements rule
   
   Incorporating Partitioning and Parallel Plans into the SCOPE Optimizer
   http://www.cs.albany.edu/~jhh/courses/readings/zhou10.pdf
   
   **Describe the solution you'd like**
   A clear and concise description of what you want to happen.
   
   **Describe alternatives you've considered**
   A clear and concise description of any alternative solutions or features 
you've considered.
   
   **Additional context**
   Add any other context or screenshots about the feature request here.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to