rdettai edited a comment on pull request #972:
URL: https://github.com/apache/arrow-datafusion/pull/972#issuecomment-920074011
> I think it also may be related to Spark's scheduler which I think can
control the number of concurrently "active" partitions. DataFusion just starts
them all at once.
In my mind, DataFusion already supports any form of scheduling of the
partitions. It is just the `DefaultPhysicalPlanner` and the
`datafusion::physical_plan::collect(ExecutionPlan)` helper that don't allow it.
As a library user, the physical plan stage is just as much a public API, and
thanks to the `ExecutionPlan.execute(partition_id)` method being at the
partition level, you have a lot of freedom on scheduling. Ballista is a great
example for this, with different partitions being packaged into tasks and and
picked up by different executors. Of course, Ballista required a different
planner, or at least to tweak the default plan a bit to handle shuffle
boundaries, but it seems to me that this is a "supported" (term to be defined
😄) way to use DataFusion.
> one core challenge in our understanding is that the term partition is
overloaded with at least two meanings
I do think that we agree that whatever layout we have for the data in the
storage, we can map it to any number of partition in the `ExecutionPlan`. And
we all agree that this mapping should be tunable because for the same dataset,
different compute hardware or engines using DataFusion will perform differently
according to how this mapping is performed. I think the challenge in our
understanding is more about how the `ExecutionPlan` partitioning is bound to
the engine parallelism. If there is a 1 to 1 mapping today because of the
`ExecutionPlan` proposed by the current `DefaultPhysicalPlanner`, that is not a
structural behavior of DataFusion. This is just the current default
implementation (with #64 proposing to change that). But here, we are changing
the API of the `TableProvider`/`LogicalPlan`, and the decisions we take while
doing so should at least accommodate the most classical implementations for the
planner. We, Ballista (which, by the way, is where the original hardcoded va
lue #708 took place) and #64 all agree that having a parallelism decoupled
from the number of partitions is a valid usecase and planning strategy.
So what does `target_partitions` mean:
- it does not mean "target concurrency", because the execution might use a
scheduler that runs only `n` out of `m` partitions at a time (and it does in
Ballista)
- does it mean that the table provider should do its best to output that
number of partition? that is a setting that fits the case where the execution
maps parallelism to partitions 1 to 1, but in general it will not scale well:
- if a query scans a tiny amount of data we will be forcing the table
provider to split the data, thus add overhead, for nothing
- if a query scans a huge amount of data, our partitions will be huge
which might lead to more memory pressure
If there is too much ambiguity around the `target_partitions` (or
`max_partitions`) parameter (and I feel there is), my advice would be to leave
it configured at the `ParquetTable` level for now. We could simply serialize it
in the `ParquetTableScanNode` to fix the hardcoding issue in `from_proto`. This
way we avoid an API change in the `TableProvider` trait, but we still fix #708.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]