rdettai commented on pull request #972: URL: https://github.com/apache/arrow-datafusion/pull/972#issuecomment-920074011
> I think it also may be related to Spark's scheduler which I think can control the number of concurrently "active" partitions. DataFusion just starts them all at once. In my mind, DataFusion already supports any form of scheduling of the partitions. It is just the `DefaultPhysicalPlanner` and the `datafusion::physical_plan::collect(ExecutionPlan)` helper that don't allow it. As a library user, the physical plan stage is just as much a public API, and thanks to the `execute()` method being at the partition level, you have a lot of freedom on scheduling. Ballista is a great example for this, with different partitions being packaged into tasks and and picked up by different executors. Of course, Ballista required a different planner, or at least to tweak the default plan a bit to handle shuffle boundaries, but it seems to me that this is a "supported" (term to be defined 😄) way to use DataFusion. > one core challenge in our understanding is that the term partition is overloaded with at least two meanings I do think that we agree that however the data is structured in the storage, we can map it to any number of partition in the `ExecutionPlan`. And we all agree that want this mapping to be tunable. I think the challenge in our understanding is more about how the `ExecutionPlan` partitioning is bound to the engine parallelism. If there is a 1 to 1 mapping today because of the `ExecutionPlan` proposed by the current `DefaultPhysicalPlanner`, that is not a structural behavior of DataFusion. This is just the current default implementation (with #64 proposing to change that). But here, we are changing the API of the `TableProvider`/`LogicalPlan`, and the decisions we take while doing so should at least accommodate the most classical implementations for the planner. We, Ballista (which, by the way, is where the original hardcoded value #708 took place) and #64 all agree that having a parallelism decoupled from the number of partitions is a valid usecase and planning strategy. So what does `target_partitions` mean: - it does not mean "target concurrency", because the execution might use a scheduler that runs only `n` out of `m` partitions at a time (and it does in Ballista) - does it mean that the table provider should do its best to output that number of partition? that fits the case where the execution maps parallelism to partitions 1 to 1, but in general it will be pretty unhelpfull: - if a query scans a tiny amount of data we will be forcing the table provider to split the data, thus add overhead, for nothing - if a query scans a huge amount of data, our partitions will be huge which might lead to more memory pressure If we leave `target_partitions` (or `max_partitions`) configured at the `ParquetTable` level for now, we could simply serialize it in the `ParquetTableScanNode` instead of hardcoding it in `from_proto`. This way we avoid an API level change at the `TableProvider` trait level, but we still fix #708. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org