maytasm commented on code in PR #19357:
URL: https://github.com/apache/druid/pull/19357#discussion_r3193321843


##########
processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java:
##########
@@ -345,12 +354,103 @@ public Entry<KeyType> apply(Entry<KeyType> entry)
 
   private void spill() throws IOException
   {
+    // Stream directly to a temp file first, then check the file size. If the 
file is small
+    // (serialized size much smaller than the pre-allocated buffer, e.g. HLL 
sketches in List mode),
+    // read it back into memory for batching to avoid creating thousands of 
tiny disk files.
+    // If the file is already large enough, keep it on disk as-is.
+    final File file;
     try (CloseableIterator<Entry<KeyType>> iterator = grouper.iterator(true)) {
-      files.add(spill(iterator));
-      dictionaryFiles.add(spill(keySerde.getDictionary().iterator()));
+      file = spill(iterator);
+    }
+    pendingDictionaryEntries.addAll(keySerde.getDictionary());
+    grouper.reset();
+
+    final long fileSize = file.length();
+    if (fileSize < minSpillFileSize) {
+      pendingSpillRuns.add(Files.readAllBytes(file.toPath()));
+      pendingSpillBytes += fileSize;
+      temporaryStorage.delete(file);
+
+      if (pendingSpillBytes >= minSpillFileSize) {
+        flushPendingRunsToDisk();
+      }
+    } else {
+      files.add(file);
+      dictionaryFiles.add(spill(pendingDictionaryEntries.iterator()));
+      pendingDictionaryEntries.clear();
+    }
+  }
+
+  /**
+   * Merge-sorts all pending in-memory spill runs and writes them as a single 
sorted file to disk.
+   * Each run is already individually sorted (from grouper.iterator(true)); 
this method merges them
+   * so the output file is fully sorted, as required by iterator()'s 
mergeSorted across files.
+   */
+  private void flushPendingRunsToDisk() throws IOException
+  {
+    if (pendingSpillRuns.isEmpty()) {
+      return;
+    }
+
+    final Comparator<Entry<KeyType>> sortComparator =
+        sortHasNonGroupingFields ? defaultOrderKeyObjComparator : 
keyObjComparator;
 
-      grouper.reset();
+    final List<MappingIterator<Entry<KeyType>>> readers = new 
ArrayList<>(pendingSpillRuns.size());
+    try {
+      for (final byte[] runBytes : pendingSpillRuns) {
+        readers.add(spillMapper.readValues(
+            spillMapper.getFactory().createParser(new LZ4BlockInputStream(new 
ByteArrayInputStream(runBytes))),
+            
spillMapper.getTypeFactory().constructParametricType(ReusableEntry.class, 
keySerde.keyClazz())
+        ));
+      }
+      final List<CloseableIterator<Entry<KeyType>>> iterators = new 
ArrayList<>(readers.size());
+      for (final MappingIterator<Entry<KeyType>> reader : readers) {
+        iterators.add(deserializeIterator(reader));
+      }
+      files.add(spill(CloseableIterators.mergeSorted(iterators, 
sortComparator)));
+      dictionaryFiles.add(spill(pendingDictionaryEntries.iterator()));
     }
+    finally {
+      for (final MappingIterator<Entry<KeyType>> reader : readers) {
+        try {
+          reader.close();
+        }
+        catch (IOException e) {
+          log.warn(e, "Failed to close reader while flushing pending spill 
runs");
+        }
+      }
+      pendingSpillRuns.clear();
+      pendingSpillBytes = 0;
+      pendingDictionaryEntries.clear();
+    }
+  }
+
+  private CloseableIterator<Entry<KeyType>> deserializeIterator(final 
Iterator<Entry<KeyType>> iterator)
+  {
+    return CloseableIterators.withEmptyBaggage(
+        Iterators.transform(
+            iterator,
+            new Function<>()
+            {
+              final ReusableEntry<KeyType> reusableEntry =
+                  ReusableEntry.create(keySerde, aggregatorFactories.length);
+
+              @Override
+              public Entry<KeyType> apply(Entry<KeyType> entry)
+              {
+                final Object[] deserializedValues = reusableEntry.getValues();
+                for (int i = 0; i < deserializedValues.length; i++) {
+                  deserializedValues[i] = 
aggregatorFactories[i].deserialize(entry.getValues()[i]);
+                  if (deserializedValues[i] instanceof Integer) {

Review Comment:
   This is existing code. 
https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java#L319
   I just refactored it to a common method to avoid repeating code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to