rdettai edited a comment on pull request #972:
URL: https://github.com/apache/arrow-datafusion/pull/972#issuecomment-920074011


   > I think it also may be related to Spark's scheduler which I think can 
control the number of concurrently "active" partitions. DataFusion just starts 
them all at once.
   
   In my mind, DataFusion already supports any form of scheduling of the 
partitions. It is just the `DefaultPhysicalPlanner` and the 
`datafusion::physical_plan::collect(ExecutionPlan)` helper that don't allow it. 
As a library user, the physical plan stage is just as much a public API, and 
thanks to the `ExecutionPlan.execute(partition_id)` method being at the 
partition level, you have a lot of freedom on scheduling. Ballista is a great 
example for this, with different partitions being packaged into tasks and and 
picked up by different executors. Of course, Ballista required a different 
planner, or at least to tweak the default plan a bit to handle shuffle 
boundaries, but it seems to me that this is a "supported" (term to be defined 
😄) way to use DataFusion. 
   
   > one core challenge in our understanding is that the term partition is 
overloaded with at least two meanings
   
   I do think that we agree that however the data is structured in the storage, 
we can map it to any number of partition in the `ExecutionPlan`. And we all 
agree that this mapping should be tunable because for the same dataset, 
different hardware or engines using datafusion will perform differently 
according to how this mapping is performed. I think the challenge in our 
understanding is more about how the `ExecutionPlan` partitioning is bound to 
the engine parallelism. If there is a 1 to 1 mapping today because of the 
`ExecutionPlan` proposed by the current `DefaultPhysicalPlanner`, that is not a 
structural behavior of DataFusion. This is just the current default 
implementation (with #64 proposing to change that). But here, we are changing 
the API of the `TableProvider`/`LogicalPlan`, and the decisions we take while 
doing so should at least accommodate the most classical implementations for the 
planner. We, Ballista (which, by the way, is where the original hardcoded value 
#708 took 
 place) and #64 all agree that having a parallelism decoupled from the number 
of partitions is a valid usecase and planning strategy.
   
   So what does `target_partitions` mean:
   - it does not mean "target concurrency", because the execution might use a 
scheduler that runs only `n` out of `m` partitions at a time (and it does in 
Ballista)
   - does it mean that the table provider should do its best to output that 
number of partition? that fits the case where the execution maps parallelism to 
partitions 1 to 1, but in general it will be pretty unhelpfull:
     - if a query scans a tiny amount of data we will be forcing the table 
provider to split the data, thus add overhead, for nothing
     - if a query scans a huge amount of data, our partitions will be huge 
which might lead to more memory pressure
   
   If we leave `target_partitions` (or `max_partitions`) configured at the 
`ParquetTable` level for now, we could simply serialize it in the 
`ParquetTableScanNode` instead of hardcoding it in `from_proto`. This way we 
avoid an API level change at the `TableProvider` trait level, but we still fix 
#708.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to