jtuglu1 commented on code in PR #19357:
URL: https://github.com/apache/druid/pull/19357#discussion_r3192193001
##########
processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java:
##########
Review Comment:
Now that we are properly tracking bytes during deletion, I wonder if we
should also look at deleting `dictionaryFiles` as well here.
##########
processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java:
##########
@@ -72,6 +74,17 @@ public class SpillingGrouper<KeyType> implements
Grouper<KeyType>
"Maximum number of spill files reached for this query. Try raising
druid.query.groupBy.maxSpillFileCount."
);
+ /**
+ * Minimum number of serialized bytes that must accumulate across pending
in-memory spill runs before they are
+ * flushed as a single file to disk. Aggregators like ThetaSketch
pre-allocate a large fixed buffer per row
+ * (e.g. ~131KB for ThetaSketch(K=16384)), causing the in-memory grouper to
flush frequently. However, when
+ * each key has been seen only a few times, the sketch serializes to just a
handful of bytes in compact form.
+ * Without batching, this produces thousands of tiny spill files. By
accumulating runs in heap memory first
+ * and writing to disk only once this threshold is reached, we avoid that
explosion in file count without any
+ * extra disk I/O for small spills.
+ */
+ private static final long MIN_SPILL_FILE_BYTES = 1024 * 1024L; // 1MB
Review Comment:
let's make this a config. sketch sizes can vary with the columns they cover
and the k-values.
##########
processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedTemporaryStorage.java:
##########
@@ -121,13 +122,15 @@ public void delete(final File file)
{
synchronized (files) {
if (files.contains(file)) {
+ final long fileSize = file.length();
try {
Files.delete(file.toPath());
}
catch (IOException e) {
log.warn(e, "Cannot delete file: %s", file);
}
files.remove(file);
+ bytesUsed.addAndGet(-fileSize);
Review Comment:
if this delete fails, I'd rather not have our accounting also be inaccurate
– can we put this in the try after the delete?
##########
processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java:
##########
@@ -293,6 +320,22 @@ public void setSpillingAllowed(final boolean
spillingAllowed)
@Override
public CloseableIterator<Entry<KeyType>> iterator(final boolean sorted)
{
+ // Flush any runs that did not reach MIN_SPILL_FILE_BYTES during the spill
phase.
+ try {
+ flushPendingRunsToDisk();
Review Comment:
we should see what the overhead of `sorted=false` is here. If we don't need
a sorted run as the end result, we can just do a simple concat to avoid the
decompress/re-sort overhead of merge sort. I think we might need to condition
this on `sortHasNonGroupingFields=false` too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]