16pierre opened a new issue, #17334: URL: https://github.com/apache/datafusion/issues/17334
### Describe the bug I'm trying to sort some input (union of filtered Parquet files) with fixed parallelism, in order to do so I'm manually fiddling with a round-robin `RepartitionExec` operator before the sort. I reckon this is slightly hacky, they may be alternatives, the possibility of unions of large numbers of inputs makes partitioning trickier though. When doing so, I get memory allocation failures on the `RepartitionExec` operator (I suspect this could happen with other non-spillable operators used as input of the Sort nodes though): ``` ResourcesExhausted(\"Additional allocation failed with top memory consumers (across reservations) as: ExternalSorter[0] consumed 145849856 bytes, ExternalSorterMerge[0] consumed 50000000 bytes, RepartitionExec[0] consumed 2951744 bytes. Error: Failed to allocate additional 2951744 bytes for RepartitionExec[0] with 2951744 bytes already allocated for this reservation - 1198400 bytes remain available for the total pool ``` If my understanding is correct, this feels expected given the current sort implementation: we trigger spill only after saturating the memory pool and getting allocation failure on `try_grow`: https://github.com/apache/datafusion/blob/033fc9b0a124082c6b4b6c68a8445c16bdaa51a1/datafusion/physical-plan/src/sorts/sort.rs#L787 Which means that once the global memory pool is filled up with sort allocations, any operator used as input of the sort can fail if it uses non-spillable memory allocations. ### To Reproduce My current repro setup cannot be shared as a snippet because it uses a bunch of production codepaths. Roughly what I'm doing is: - write a Parquet file with a couple millions rows (binary columns, 16 bytes each, UUIDs) - read the Parquet file and run repartition + sort ``` // Memory allocator = FairSpillPool let physical_plan = open_my_parquet_file(...); // Note: I get failures with input partitions = 1 and sort_parallelism = 1 let partitioning = Partitioning::RoundRobinBatch(sort_parallelism as usize); let repartitioned_exec = Arc::new(RepartitionExec::try_new(physical_plan, partitioning)) SortExec::new(sort_expression.clone(), repartitioned_exec) ``` Note we're currently on Datafusion `46.0.1` (will upgrade soon to pick up recent sort features) ### Expected behavior _No response_ ### Additional context Main work-arounds/hacks that come to mind: - register a fake spillable allocation to force `FairSpillPool` to spill earlier than before memory is full - implement custom MemoryPool to special-case how different operators are tracked Ideally I'd prefer a solution where we can precisely control how much memory is allocated to the sort compared to the rest of the operators, especially given the subtle memory allocations we faced during the sort merge phase - the solution "fake spillable allocation" is a bit coarse-grain with that regards. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org