2010YOUY01 opened a new issue, #23174: URL: https://github.com/apache/datafusion/issues/23174
### Is your feature request related to a problem or challenge? Original discussion: https://github.com/apache/datafusion/pull/23026#issuecomment-4793157335 Summary: DataFusion mostly uses repartition-based parallelism today, but at some point we need to introduce intra-operator parallelism, and we have to do that carefully for performance. The existing `SortExec` has already exposed some internal worker parallelism: it is possible to have a large number of concurrent workers for local sorting. - https://github.com/apache/datafusion/blob/a00f7499e139c29e13b8b94baba262435bef417b/datafusion/physical-plan/src/sorts/sort.rs#L626-L634 This issue explains the background and proposes some ideas for improving this support. ### What is internal worker parallelism DataFusion mostly uses repartition based parallelism, each partition has independent data, and we use 1 CPU core to process one partition, here is a parallel aggregation query example: <img width="1213" height="760" alt="Image" src="https://github.com/user-attachments/assets/4484b7fc-002c-4d17-adf0-5046e546fe15" /> For certain workloads, the assumptions for repartition are not ideal, here are 3 motivating examples #### Motivating Example 1: memory pressure case Let's say we're doing a large sort (data size >> memory), on a machine with 32 cores, 64GB memory. The default setting will execute it with 32 global partitions, and with each partition a classic external sort algorithm is executed (local sort, spill disk, and finally read back and sort-preserving merge) The issue is that per-partition memory budget is low, the spilling might create smaller sorted runs, and the end-to-end execution requires extra spills, reading back, and merging smaller files. A more ideal plan shape is: - the scanner keep 32 partitions, since they're not memory intensive and scalable with number of CPU cores - SortExec shrink to 8 partitions, with 4 internal workers per partition. This gives each partition more memory budget to proceed easier; also there are efficient algorithms to parallelize sort and sort-preserving merge within partition. <img width="1289" height="770" alt="Image" src="https://github.com/user-attachments/assets/213cf7fc-64a6-4a8b-85f1-b2757c22d2e9" /> #### Motivating Example 2: segment-tree based parallelism in window functions The window query in the figure is impossible to parallelize with repartition, because it assumes data independence among partitions, and the query has one global partition, and window frame changes every row. At the meantime, there is a very parallel algorithm if we can allow shared memory among partitions: - https://www.vldb.org/pvldb/vol8/p1058-leis.pdf Then the ideal query shape become ``` (any downstream exec) -- RepartitionExec(round-robin on batch, input_partitions=1, otuput_partitions=32) ---- WindowExec(partition=1, internal_parallelism=32) ------ CoalescePartitionExec(input_partitions=32, output_partitions=1) -------- CsvExec(partition=32, internal_parallelism=1) ``` <img width="1322" height="745" alt="Image" src="https://github.com/user-attachments/assets/105395d1-f7a1-41d5-b4c8-9be4753f9f04" /> ### Motivating Example 3 I haven't checked this PR carefully yet, but it seems also try to introduce intra operator parallelism: https://github.com/apache/datafusion/pull/23124 ### Describe the solution you'd like - Establish conventions for intra-operator parallelism. For example, each execution plan stage may want to maintain a similar total concurrency level: `partition_count * internal_workers_per_partition`. See the CsvExec + WindowExec example above. - Improve `Explain` output for internal parallelism. The existing single-partition `SortExec` can still use internal parallelism, but the plan currently looks serial, which makes it hard to inspect potential performance issues. ### Describe alternatives you've considered _No response_ ### Additional context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
