thinkharderdev opened a new issue, #332:
URL: https://github.com/apache/arrow-ballista/issues/332

   **Is your feature request related to a problem or challenge? Please describe 
what you are trying to do.**
   A clear and concise description of what the problem is. Ex. I'm always 
frustrated when [...] 
   (This section helps Arrow developers understand the context and *why* for 
this feature, in addition to  the *what*)
   
   Currently, the Ballista scheduler treats each task slot as effectively its 
own machine but in practice an executor may have one task slot per CPU core. So 
in a cluster with 12 executor nodes each with 12 cores, the scheduler would 
treat them like 144 independent machines. This adds a lot of unnecessary 
inefficiency to some common queries. 
   
   To take a simple example, for a query like `SELECT * FOM table WHERE foo = 
'bar' LIMIT 1000` on 144 partitions, we would break this into a two stage query
   
   Stage 2:
   GlobalLimitExec: fetch=1000
     ShuffleReaderExec
   
   Stage 1:
   ShuffleWriterExec
     LocalLimitExec
       FilterExec
         ParquetExec
   
   Each partition would read 1000 rows and then we would schedule the second 
stage which would stop after reading one of the 144 output partitions from the 
previous stage. So we've effectively read 144k rows from when we only need 1k. 
   
   While it's not possible to eliminate this issue entirely in a distributed 
query we can do better than the current approach I think.
   
   **Describe the solution you'd like**
   A clear and concise description of what you want to happen.
   
   If the scheduler is able to schedule multiple partitions on the same node 
(eg executor slots on a single executor) then it can rewrite the plan to 
coalesce those tasks into a `TaskGroup` which can be executed more efficiently 
with existing constructs in DataFusion. For example. for  our example query 
above, if all 12 task slots are available on a given executor, then we can take 
those twelve tasks and scheduler them as a single execution plan:
   
   GlobalLimitExec: fetch=1000
     CoalescePartitionsExec
       LocalLimitExec: fetch = 1000
         FilterExec
           ParquetExec
   
   This can be wrapped in a `TaskGroup` data structure that (simplified) looks 
something like:
   
   ```rust
   struct TaskGroup {
     partitions: Vec<usize>,
     plan: Arc<dyn ExecutionGraph>
   }
   ```
   
   Then the stage 2 plan needs to be adjusted since it will read a smaller 
number of input partitions. 
   
   Alternatively we can generalize the existing data structures to handle this 
case. 
   
   **Describe alternatives you've considered**
   A clear and concise description of any alternative solutions or features 
you've considered.
   
   We could leave things as is
   
   **Additional context**
   Add any other context or screenshots about the feature request here.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to