westonpace opened a new issue, #10073:
URL: https://github.com/apache/arrow-datafusion/issues/10073

   ### Describe the bug
   
   This is related / tangential to #9359
   
   My primary problem is that I am trying to sort 100 million strings and 
always getting errors like:
   
   ```
   Resources exhausted: Failed to allocate additional 39698712 bytes for 
ExternalSorterMerge[0] with 4053021824 bytes already allocated - maximum 
available is 26915520
   ```
   
   After reading through the sort impl a bit I have noticed a few concerns 
(recorded in additional context)
   
   ### To Reproduce
   
   I don't have a df reproduction but this reproduces it for me in lance:
   
   ```
   import pyarrow as pa
   import lance
   
   # Once the dataset has been generated you can comment out this generation 
code and reproduce the issue very quickly
   print("Generating data")
   my_strings = [f"string-{i}" * 3 for i in range(100 * 1024 * 1024)]           
                                                                                
                                                    
   my_table = pa.table({"my_strings": my_strings})                              
                                                                                
                                                    
   
   print("Writing dataset")
   ds = lance.write_dataset(                                                    
                                                                                
                                                    
       my_table, "/tmp/big_strings.lance", mode="overwrite", 
schema=my_table.schema                                                          
                                                                       
   )                                                                            
                                                                                
                                                    
   del my_table                                                                 
                                                                                
                                                    
   # End of generation code
   
   ds = lance.dataset("/tmp/big_strings.lance")
   print("Training scalar index")
   # To create a scalar index we must sort the column, this is where the error 
occurs
   ds.create_scalar_index("my_strings", "BTREE")
   ```
   
   ### Expected behavior
   
   I can sort any number of strings, as long as I don't overflow the disk
   
   ### Additional context
   
   Here is how I understand memory accounting in the sort today:
   
   - As batches arrive, they are placed in an accumulation queue, and the size 
of the spillable reservation grows
   - Once the pool is out of space we begin the spill process
     - The first part of the spill process is to sort the accumulation queue 
(which, at this point, has many batches in it)
     - Each batch becomes an input stream for a SortPreservingMergeStream (this 
is a LOT of inputs).
     - When the batch input stream is polled the batch is sorted, the unsorted 
batch is dropped (and removed from the spillable reservation) and the sorted 
batch is returned.  The SortPreservingMergeStream then puts this batch in the 
batch builder (which adds it to the non-spillable reservation).  *This is a 
problem, as described below*
     - As the sort preserving merge stream is polled it polls the various 
inputs, fills up the batch builder, and then starts to emit output batches
   - Back in the sort exec the sort preserving merge stream is fully drained 
(try_collect).  *This is a problem, as described below*
   
   The first problem (and the one causing my error) is that a sorted batch of 
strings (the output of `sort_batch`) is occupying 25% more memory than the 
unsorted batch of strings.  I'm not sure if this buffer alignment, padding, or 
some kind of 2x allocation strategy used by the sort, but it seems reasonable 
something like this could happen.  Unfortunately, this is a problem.  We are 
spilling because we have used up the entire memory pool.  We now take X bytes 
from the memory pool, convert it into 1.25 * X bytes, and try to put it back in 
the memory pool.  This fails with the error listed above.
   
   The second problem is that we are not accounting for the output of the sort 
perserving merge stream.  Each output batch from the sort preserving merge 
stream is made up of rows from the various input batches.  In the degenerate 
case, where the input data is fully random, this means we will probably require 
2 * X bytes.  This is because each output batch is made up of 1 batch from each 
input stream.  We can't release any of the input batches until we emit the 
final output batch.
   
   The solution to this second problem is that we should be streaming into the 
spill file.  We should not collect from the sort preserving merge stream and 
then write the collected batches into the spill file.  This problem is a bit 
less concerning for me at the moment because it is "datafusion uses more memory 
than it should" and not "datafusion is failing the plan with an error".  We 
don't do a lot of sorting in lance and so we can work around it reasonably well 
by halving the size of the spill pool.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to