niebayes commented on issue #5173:
URL: https://github.com/apache/datafusion/issues/5173#issuecomment-2670953355

   @Blizzara Thanks for your reply. I initially choose the physical plan 
because there're more computation can be distributed to executors in a 
distributed query engine.
   
   Say a sql:
   ```
   select avg(value) from sx1 group by sid having sid > 1;
   ```
   
   The corresponding logical plan might be:
   ```
   Projection: avg(sx1.value)                                                   
        |
     Aggregate: groupBy=[[sx1.sid]], aggr=[[avg(CAST(sx1.value AS Float64))]]   
        |
       Filter: sx1.sid > Int8(1)                                                
        |
         TableScan: sx1 projection=[sid, value], partial_filters=[sx1.sid > 
Int8(1)]  
   ```
   
   And the physical plan might look like:
   ```
   ProjectionExec: expr=[avg(sx1.value)@1 as avg(sx1.value)]                    
        |
     AggregateExec: mode=FinalPartitioned, gby=[sid@0 as sid], 
aggr=[avg(sx1.value)]    |
       CoalesceBatchesExec: target_batch_size=8192                              
        |
         RepartitionExec: partitioning=Hash([sid@0], 8), input_partitions=8     
        |
           AggregateExec: mode=Partial, gby=[sid@0 as sid], 
aggr=[avg(sx1.value)]       |
             CoalesceBatchesExec: target_batch_size=8192                        
        |
               FilterExec: sid@0 > 1                                            
        |
                 ParquetExec: file_groups = [..]  
   ```
   
   By learning from the datafusion-ballista project, I know we can split the 
execution plan at pipeline breakers (including RepartitionExec, 
SortPreservingExec, CoalescePartitionsExec, etc.). So the above execution plan 
would be split into two parts (aka. pipelines):
   
   ```
   ProjectionExec: expr=[avg(sx1.value)@1 as avg(sx1.value)]                    
        |
     AggregateExec: mode=FinalPartitioned, gby=[sid@0 as sid], 
aggr=[avg(sx1.value)]    |
       CoalesceBatchesExec: target_batch_size=8192   
          MergePipelinesExec: pipeline_ids = [...]
   ```
   
   
   ```
             AggregateExec: mode=Partial, gby=[sid@0 as sid], 
aggr=[avg(sx1.value)]       |
               CoalesceBatchesExec: target_batch_size=8192                      
          |
                 FilterExec: sid@0 > 1                                          
          |
                   ParquetExec: file_groups = [..]
   ```
   
   As you can see, the first stage of the parallel aggregation algorithm can be 
distributed to multiple executors which makes the resource utilization better. 
   If we choose to split the logical plan, it seems we can't distribute the 
aggregation operation, even part of it. I don't know if my understanding is 
right.
   
   By the way, datafusion-ballista is good for OLAP workloads and it assumes 
executors are stateless. However, in my scenario, executors are stateful and 
each executor maintain an in-memory buffer containing the most recently written 
data (History data are stored in shared object storage). So, when the scheduler 
is about to construct a physical plan, it has to query each executor for the 
latest statistics which is required for query optimization. I wonder if it's 
the standard approach to achieve distributed query based on DataFusion, since 
the implementation seems complicated.
   
   I really hope the DataFusion community can provide some recommendations on 
building a distributed query engine based on DataFusion.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to