jtuglu1 commented on code in PR #19357:
URL: https://github.com/apache/druid/pull/19357#discussion_r3192789245
##########
processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java:
##########
@@ -345,12 +354,103 @@ public Entry<KeyType> apply(Entry<KeyType> entry)
private void spill() throws IOException
{
+ // Stream directly to a temp file first, then check the file size. If the
file is small
+ // (serialized size much smaller than the pre-allocated buffer, e.g. HLL
sketches in List mode),
+ // read it back into memory for batching to avoid creating thousands of
tiny disk files.
+ // If the file is already large enough, keep it on disk as-is.
+ final File file;
try (CloseableIterator<Entry<KeyType>> iterator = grouper.iterator(true)) {
- files.add(spill(iterator));
- dictionaryFiles.add(spill(keySerde.getDictionary().iterator()));
+ file = spill(iterator);
+ }
+ pendingDictionaryEntries.addAll(keySerde.getDictionary());
+ grouper.reset();
+
+ final long fileSize = file.length();
+ if (fileSize < minSpillFileSize) {
+ pendingSpillRuns.add(Files.readAllBytes(file.toPath()));
+ pendingSpillBytes += fileSize;
+ temporaryStorage.delete(file);
+
+ if (pendingSpillBytes >= minSpillFileSize) {
+ flushPendingRunsToDisk();
+ }
+ } else {
+ files.add(file);
+ dictionaryFiles.add(spill(pendingDictionaryEntries.iterator()));
+ pendingDictionaryEntries.clear();
+ }
+ }
+
+ /**
+ * Merge-sorts all pending in-memory spill runs and writes them as a single
sorted file to disk.
+ * Each run is already individually sorted (from grouper.iterator(true));
this method merges them
+ * so the output file is fully sorted, as required by iterator()'s
mergeSorted across files.
+ */
+ private void flushPendingRunsToDisk() throws IOException
+ {
+ if (pendingSpillRuns.isEmpty()) {
+ return;
+ }
+
+ final Comparator<Entry<KeyType>> sortComparator =
+ sortHasNonGroupingFields ? defaultOrderKeyObjComparator :
keyObjComparator;
- grouper.reset();
+ final List<MappingIterator<Entry<KeyType>>> readers = new
ArrayList<>(pendingSpillRuns.size());
+ try {
+ for (final byte[] runBytes : pendingSpillRuns) {
+ readers.add(spillMapper.readValues(
+ spillMapper.getFactory().createParser(new LZ4BlockInputStream(new
ByteArrayInputStream(runBytes))),
+
spillMapper.getTypeFactory().constructParametricType(ReusableEntry.class,
keySerde.keyClazz())
+ ));
+ }
+ final List<CloseableIterator<Entry<KeyType>>> iterators = new
ArrayList<>(readers.size());
+ for (final MappingIterator<Entry<KeyType>> reader : readers) {
+ iterators.add(deserializeIterator(reader));
+ }
+ files.add(spill(CloseableIterators.mergeSorted(iterators,
sortComparator)));
+ dictionaryFiles.add(spill(pendingDictionaryEntries.iterator()));
}
+ finally {
+ for (final MappingIterator<Entry<KeyType>> reader : readers) {
+ try {
+ reader.close();
+ }
+ catch (IOException e) {
+ log.warn(e, "Failed to close reader while flushing pending spill
runs");
+ }
+ }
+ pendingSpillRuns.clear();
+ pendingSpillBytes = 0;
+ pendingDictionaryEntries.clear();
+ }
+ }
+
+ private CloseableIterator<Entry<KeyType>> deserializeIterator(final
Iterator<Entry<KeyType>> iterator)
+ {
+ return CloseableIterators.withEmptyBaggage(
+ Iterators.transform(
+ iterator,
+ new Function<>()
+ {
+ final ReusableEntry<KeyType> reusableEntry =
+ ReusableEntry.create(keySerde, aggregatorFactories.length);
+
+ @Override
+ public Entry<KeyType> apply(Entry<KeyType> entry)
+ {
+ final Object[] deserializedValues = reusableEntry.getValues();
+ for (int i = 0; i < deserializedValues.length; i++) {
+ deserializedValues[i] =
aggregatorFactories[i].deserialize(entry.getValues()[i]);
+ if (deserializedValues[i] instanceof Integer) {
Review Comment:
Curious why are we coercing to long values here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]