jinchengchenghh commented on issue #8025:
URL: 
https://github.com/apache/incubator-gluten/issues/8025#issuecomment-2496534497

   Spark also open all the spill file to read.
   ```
   final UnsafeSorterSpillMerger spillMerger = new UnsafeSorterSpillMerger(
           recordComparatorSupplier.get(), prefixComparator, 
spillWriters.size());
         for (UnsafeSorterSpillWriter spillWriter : spillWriters) {
           
spillMerger.addSpillIfNotEmpty(spillWriter.getReader(serializerManager));
         }
         if (inMemSorter != null) {
           readingIterator = new 
SpillableIterator(inMemSorter.getSortedIterator());
           spillMerger.addSpillIfNotEmpty(readingIterator);
         }
         return spillMerger.getSortedIterator();
   ```
   Spark use the `PriorityQueue<UnsafeSorterIterator> priorityQueue` to get the 
record to merge.
   ```
   Comparator<UnsafeSorterIterator> comparator = (left, right) -> {
         int prefixComparisonResult =
           prefixComparator.compare(left.getKeyPrefix(), right.getKeyPrefix());
         if (prefixComparisonResult == 0) {
           return recordComparator.compare(
             left.getBaseObject(), left.getBaseOffset(), left.getRecordLength(),
             right.getBaseObject(), right.getBaseOffset(), 
right.getRecordLength());
         } else {
           return prefixComparisonResult;
         }
       };
       priorityQueue = new PriorityQueue<>(numSpills, comparator);
   ```
    It has config to control the read buffer size (default 1 MB) as following:
   ```
     private[spark] val UNSAFE_SORTER_SPILL_READ_AHEAD_ENABLED =
       ConfigBuilder("spark.unsafe.sorter.spill.read.ahead.enabled")
         .internal()
         .version("2.3.0")
         .booleanConf
         .createWithDefault(true)
   
     private[spark] val UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE =
       ConfigBuilder("spark.unsafe.sorter.spill.reader.buffer.size")
         .internal()
         .version("2.1.0")
         .bytesConf(ByteUnit.BYTE)
         .checkValue(v => 1024 * 1024 <= v && v <= MAX_BUFFER_SIZE_BYTES,
           s"The value must be in allowed range [1,048,576, 
${MAX_BUFFER_SIZE_BYTES}].")
         .createWithDefault(1024 * 1024)
   ```
   class `UnsafeSorterSpillReader`
   ```
   if (readAheadEnabled) {
           this.in = new 
ReadAheadInputStream(serializerManager.wrapStream(blockId, bs),
                   bufferSizeBytes);
         } else {
           this.in = serializerManager.wrapStream(blockId, bs);
         }
         this.din = new DataInputStream(this.in);
   ```
   
   
   It only needs to load one record at one time, after loaded, it will put the 
`UnsafeSorterIterator reader` to the priorityQueue again to load next record.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to