yjshen edited a comment on issue #2079:
URL: 
https://github.com/apache/arrow-datafusion/issues/2079#issuecomment-1078958940


   Thanks for bringing it up. 
   
   Recently, we've encountered several different circumstances to deal with 
query execution parallelism: 
   - To parallelize the parquet file scan and reduce filesystem operation 
overhead; (The current parquet chunkReader API).
   - To incorporate task concept, which is widely available in distributed 
computation frameworks such as Spark, presto, into the DataFusion core. 
https://github.com/apache/arrow-datafusion/pull/1987
   - Improved Scheduler to limit concurrently (i.e., don't scan 20 parquet 
files in parallel if you have only two cores) [From 
Influx](https://github.com/influxdata/influxdb_iox/issues/3994).
   - Together with several previous issues such as #64 #924
   
   I think it might be the right time to rethink how to divide query working 
sets, how to partition/repartition data among operators, and how we should 
schedule tasks.
   
   My opinion on this whole scheduler and execution framework is: (partly 
proposed in 
https://github.com/apache/arrow-datafusion/pull/1987#discussion_r825384514 and 
in our roadmap)
   - Adopt stage-based execution in DataFusion core.
       - Divide query execution plan based on "exchange" operators or "pipeline 
breakers" into a DAG of stages.
       - For each stage, group all operators into a `Task`. Processing data 
with operator logic serially, synced, pipelined inside each Task.
       - For the root stages that read data from files directly, partition 
input dataset based on a per-task size configuration. (similar to that of 
`input.split.maxSize` for MapReduce and similars for Spark/Presto). 
       - For non-root stages, we could either adopt a fixed `num_partition` or 
determine the number of partitions based on runtime generated size.
   
   - A shared scheduler framework for both DataFusion and Ballista.
     -  Schedule tasks based on stages dependency and schedule tasks based on 
available cores. 
   
   - Ultimately, a finer-grained execution for DataFusion core, as described in 
[Morsel-driven 
parallelism](https://15721.courses.cs.cmu.edu/spring2016/papers/p743-leis.pdf) 
and 
[Push-pull](https://www.cambridge.org/core/services/aop-cambridge-core/content/view/D67AE4899E87F4B5102F859B0FC02045/S0956796818000102a.pdf/div-class-title-push-versus-pull-based-loop-fusion-in-query-engines-div.pdf).
   
   
   By the method proposed above, we could also achieve:
    >   limit concurrently (don't scan 20 parquet files in parallel if you have 
only two cores)
   
   with the help of the TaskScheduler, and achieve:
   
     > can instead construct a more complex physical plan containing the 
necessary ProjectionExec, SchemaAdapter
   
   by an existing, sequentially executed, Task.
   
   
   cc @houqp @andygrove @liukun4515 @yahoNanJing @mingmwang. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to