16pierre opened a new issue, #17334:
URL: https://github.com/apache/datafusion/issues/17334

   ### Describe the bug
   
   I'm trying to sort some input (union of filtered Parquet files) with fixed 
parallelism, in order to do so I'm manually fiddling with a round-robin 
`RepartitionExec` operator before the sort. I reckon this is slightly hacky, 
they may be alternatives, the possibility of unions of large numbers of inputs 
makes partitioning trickier though.
   
   When doing so, I get memory allocation failures on the `RepartitionExec` 
operator (I suspect this could happen with other non-spillable operators used 
as input of the Sort nodes though):
   
   ```
   ResourcesExhausted(\"Additional allocation failed with top memory consumers 
(across reservations) as: ExternalSorter[0] consumed 145849856 bytes, 
ExternalSorterMerge[0] consumed 50000000 bytes, RepartitionExec[0] consumed 
2951744 bytes. Error: Failed to allocate additional 2951744 bytes for 
RepartitionExec[0] with 2951744 bytes already allocated for this reservation - 
1198400 bytes remain available for the total pool
   ```
   
   If my understanding is correct, this feels expected given the current sort 
implementation: we trigger spill only after saturating the memory pool and 
getting allocation failure on `try_grow`:
   
https://github.com/apache/datafusion/blob/033fc9b0a124082c6b4b6c68a8445c16bdaa51a1/datafusion/physical-plan/src/sorts/sort.rs#L787
   
   Which means that once the global memory pool is filled up with sort 
allocations, any operator used as input of the sort can fail if it uses 
non-spillable memory allocations.
   
   ### To Reproduce
   
   My current repro setup cannot be shared as a snippet because it uses a bunch 
of production codepaths.
   
   Roughly what I'm doing is:
   - write a Parquet file with a couple millions rows (binary columns, 16 bytes 
each, UUIDs)
   - read the Parquet file and run repartition + sort
   
   ```
   // Memory allocator = FairSpillPool
   
   let physical_plan = open_my_parquet_file(...);
   
   // Note: I get failures with input partitions = 1 and sort_parallelism = 1
   let partitioning = Partitioning::RoundRobinBatch(sort_parallelism as usize);
   let repartitioned_exec = Arc::new(RepartitionExec::try_new(physical_plan, 
partitioning))
   SortExec::new(sort_expression.clone(), repartitioned_exec)
   ```
   
   Note we're currently on Datafusion `46.0.1` (will upgrade soon to pick up 
recent sort features)
   
   ### Expected behavior
   
   _No response_
   
   ### Additional context
   
   Main work-arounds/hacks that come to mind:
   - register a fake spillable allocation to force `FairSpillPool` to spill 
earlier than before memory is full
   - implement custom MemoryPool to special-case how different operators are 
tracked
   
   Ideally I'd prefer a solution where we can precisely control how much memory 
is allocated to the sort compared to the rest of the operators, especially 
given the subtle memory allocations we faced during the sort merge phase - the 
solution "fake spillable allocation" is a bit coarse-grain with that regards.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to