thinkharderdev opened a new issue, #332:
URL: https://github.com/apache/arrow-ballista/issues/332
**Is your feature request related to a problem or challenge? Please describe
what you are trying to do.**
A clear and concise description of what the problem is. Ex. I'm always
frustrated when [...]
(This section helps Arrow developers understand the context and *why* for
this feature, in addition to the *what*)
Currently, the Ballista scheduler treats each task slot as effectively its
own machine but in practice an executor may have one task slot per CPU core. So
in a cluster with 12 executor nodes each with 12 cores, the scheduler would
treat them like 144 independent machines. This adds a lot of unnecessary
inefficiency to some common queries.
To take a simple example, for a query like `SELECT * FOM table WHERE foo =
'bar' LIMIT 1000` on 144 partitions, we would break this into a two stage query
Stage 2:
GlobalLimitExec: fetch=1000
ShuffleReaderExec
Stage 1:
ShuffleWriterExec
LocalLimitExec
FilterExec
ParquetExec
Each partition would read 1000 rows and then we would schedule the second
stage which would stop after reading one of the 144 output partitions from the
previous stage. So we've effectively read 144k rows from when we only need 1k.
While it's not possible to eliminate this issue entirely in a distributed
query we can do better than the current approach I think.
**Describe the solution you'd like**
A clear and concise description of what you want to happen.
If the scheduler is able to schedule multiple partitions on the same node
(eg executor slots on a single executor) then it can rewrite the plan to
coalesce those tasks into a `TaskGroup` which can be executed more efficiently
with existing constructs in DataFusion. For example. for our example query
above, if all 12 task slots are available on a given executor, then we can take
those twelve tasks and scheduler them as a single execution plan:
GlobalLimitExec: fetch=1000
CoalescePartitionsExec
LocalLimitExec: fetch = 1000
FilterExec
ParquetExec
This can be wrapped in a `TaskGroup` data structure that (simplified) looks
something like:
```rust
struct TaskGroup {
partitions: Vec<usize>,
plan: Arc<dyn ExecutionGraph>
}
```
Then the stage 2 plan needs to be adjusted since it will read a smaller
number of input partitions.
Alternatively we can generalize the existing data structures to handle this
case.
**Describe alternatives you've considered**
A clear and concise description of any alternative solutions or features
you've considered.
We could leave things as is
**Additional context**
Add any other context or screenshots about the feature request here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]