rdettai commented on pull request #972:
URL: https://github.com/apache/arrow-datafusion/pull/972#issuecomment-920074011
> I think it also may be related to Spark's scheduler which I think can
control the number of concurrently "active" partitions. DataFusion just starts
them all at once.
In my mind, DataFusion already supports any form of scheduling of the
partitions. It is just the `DefaultPhysicalPlanner` and the
`datafusion::physical_plan::collect(ExecutionPlan)` helper that don't allow it.
As a library user, the physical plan stage is just as much a public API, and
thanks to the `execute()` method being at the partition level, you have a lot
of freedom on scheduling. Ballista is a great example for this, with different
partitions being packaged into tasks and and picked up by different executors.
Of course, Ballista required a different planner, or at least to tweak the
default plan a bit to handle shuffle boundaries, but it seems to me that this
is a "supported" (term to be defined 😄) way to use DataFusion.
> one core challenge in our understanding is that the term partition is
overloaded with at least two meanings
I do think that we agree that however the data is structured in the storage,
we can map it to any number of partition in the `ExecutionPlan`. And we all
agree that want this mapping to be tunable. I think the challenge in our
understanding is more about how the `ExecutionPlan` partitioning is bound to
the engine parallelism. If there is a 1 to 1 mapping today because of the
`ExecutionPlan` proposed by the current `DefaultPhysicalPlanner`, that is not a
structural behavior of DataFusion. This is just the current default
implementation (with #64 proposing to change that). But here, we are changing
the API of the `TableProvider`/`LogicalPlan`, and the decisions we take while
doing so should at least accommodate the most classical implementations for the
planner. We, Ballista (which, by the way, is where the original hardcoded value
#708 took place) and #64 all agree that having a parallelism decoupled from the
number of partitions is a valid usecase and planning strategy.
So what does `target_partitions` mean:
- it does not mean "target concurrency", because the execution might use a
scheduler that runs only `n` out of `m` partitions at a time (and it does in
Ballista)
- does it mean that the table provider should do its best to output that
number of partition? that fits the case where the execution maps parallelism to
partitions 1 to 1, but in general it will be pretty unhelpfull:
- if a query scans a tiny amount of data we will be forcing the table
provider to split the data, thus add overhead, for nothing
- if a query scans a huge amount of data, our partitions will be huge
which might lead to more memory pressure
If we leave `target_partitions` (or `max_partitions`) configured at the
`ParquetTable` level for now, we could simply serialize it in the
`ParquetTableScanNode` instead of hardcoding it in `from_proto`. This way we
avoid an API level change at the `TableProvider` trait level, but we still fix
#708.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]