This is an automated email from the ASF dual-hosted git repository.
maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 781aba64000 perf: Optimize SpillingGrouper to avoid unnecessary disk
I/O for small spill runs (#19439)
781aba64000 is described below
commit 781aba640008cfaca39e413f53c9d2e9de371126
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Mon May 18 22:53:51 2026 -0700
perf: Optimize SpillingGrouper to avoid unnecessary disk I/O for small
spill runs (#19439)
---
.../groupby/epinephelinae/SpillOutputStream.java | 127 ++++++++++
.../groupby/epinephelinae/SpillingGrouper.java | 31 +--
.../epinephelinae/SpillOutputStreamTest.java | 271 +++++++++++++++++++++
.../groupby/epinephelinae/SpillingGrouperTest.java | 60 ++++-
4 files changed, 471 insertions(+), 18 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillOutputStream.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillOutputStream.java
new file mode 100644
index 00000000000..8d20bea534d
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillOutputStream.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * OutputStream that starts buffering in a heap byte array and switches to a
disk file via
+ * {@link LimitedTemporaryStorage} once the written bytes exceed the
threshold. This avoids
+ * the createFile/delete round-trip for small spills while bounding peak extra
heap to the
+ * threshold size.
+ */
+public class SpillOutputStream extends OutputStream
+{
+ private static final int INITIAL_BUFFER_SIZE = 4096;
+
+ private final LimitedTemporaryStorage temporaryStorage;
+ private final long threshold;
+ @Nullable
+ private ByteArrayOutputStream memoryBuffer;
+ private LimitedTemporaryStorage.LimitedOutputStream fileOut;
+ private boolean thresholdExceeded;
+
+ SpillOutputStream(LimitedTemporaryStorage temporaryStorage, long threshold)
+ {
+ this.temporaryStorage = temporaryStorage;
+ this.threshold = threshold;
+ this.memoryBuffer = new ByteArrayOutputStream((int) Math.min(threshold,
INITIAL_BUFFER_SIZE));
+ }
+
+ @Override
+ public void write(int b) throws IOException
+ {
+ checkThreshold(1);
+ if (fileOut != null) {
+ fileOut.write(b);
+ } else {
+ memoryBuffer.write(b);
+ }
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException
+ {
+ checkThreshold(len);
+ if (fileOut != null) {
+ fileOut.write(b, off, len);
+ } else {
+ memoryBuffer.write(b, off, len);
+ }
+ }
+
+ @Override
+ public void flush() throws IOException
+ {
+ if (fileOut != null) {
+ fileOut.flush();
+ }
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ if (fileOut != null) {
+ fileOut.close();
+ }
+ }
+
+ boolean isInMemory()
+ {
+ return fileOut == null;
+ }
+
+ byte[] toByteArray()
+ {
+ return memoryBuffer.toByteArray();
+ }
+
+ File getFile()
+ {
+ return fileOut.getFile();
+ }
+
+ private void checkThreshold(int count) throws IOException
+ {
+ if (!thresholdExceeded && memoryBuffer.size() + count > threshold) {
+ thresholdExceeded = true;
+ switchToDisk();
+ }
+ }
+
+ private void switchToDisk() throws IOException
+ {
+ final LimitedTemporaryStorage.LimitedOutputStream out =
temporaryStorage.createFile();
+ try {
+ memoryBuffer.writeTo(out);
+ }
+ catch (IOException e) {
+ out.close();
+ throw e;
+ }
+ fileOut = out;
+ memoryBuffer = null;
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
index 7af71f92673..96e55907b21 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
@@ -47,6 +47,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.ArrayList;
@@ -374,28 +375,24 @@ public class SpillingGrouper<KeyType> implements
Grouper<KeyType>
private void spill() throws IOException
{
- // Stream directly to a temp file first, then check the file size. If the
file is small
- // (serialized size much smaller than the pre-allocated buffer, e.g. HLL
sketches in List mode),
- // read it back into memory for batching to avoid creating thousands of
tiny disk files.
- // If the file is already large enough, keep it on disk as-is.
- final File file;
+ final SpillOutputStream spillOut = new SpillOutputStream(temporaryStorage,
minSpillFileSize);
try (CloseableIterator<Entry<KeyType>> iterator = grouper.iterator(true)) {
- file = spill(iterator);
+ serializeToStream(iterator, spillOut);
}
+
pendingDictionaryEntries.addAll(keySerde.getDictionary());
grouper.reset();
- final long fileSize = file.length();
- if (fileSize < minSpillFileSize) {
- pendingSpillRuns.add(Files.readAllBytes(file.toPath()));
- pendingSpillBytes += fileSize;
- temporaryStorage.delete(file);
+ if (spillOut.isInMemory()) {
+ final byte[] bytes = spillOut.toByteArray();
+ pendingSpillRuns.add(bytes);
+ pendingSpillBytes += bytes.length;
if (pendingSpillBytes >= minSpillFileSize) {
flushPendingRunsToDisk();
}
} else {
- files.add(file);
+ files.add(spillOut.getFile());
dictionaryFiles.add(spill(pendingDictionaryEntries.iterator()));
pendingDictionaryEntries.clear();
}
@@ -483,20 +480,24 @@ public class SpillingGrouper<KeyType> implements
Grouper<KeyType>
);
}
- private <T> File spill(Iterator<T> iterator) throws IOException
+ private <T> void serializeToStream(Iterator<T> iterator, OutputStream out)
throws IOException
{
try (
- final LimitedTemporaryStorage.LimitedOutputStream out =
temporaryStorage.createFile();
final LZ4BlockOutputStream compressedOut = new
LZ4BlockOutputStream(out);
final JsonGenerator jsonGenerator =
spillMapper.getFactory().createGenerator(compressedOut)
) {
final SerializerProvider serializers =
spillMapper.getSerializerProviderInstance();
-
while (iterator.hasNext()) {
BaseQuery.checkInterrupted();
JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator,
serializers, iterator.next());
}
+ }
+ }
+ private <T> File spill(Iterator<T> iterator) throws IOException
+ {
+ try (final LimitedTemporaryStorage.LimitedOutputStream out =
temporaryStorage.createFile()) {
+ serializeToStream(iterator, out);
return out.getFile();
}
}
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/SpillOutputStreamTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/SpillOutputStreamTest.java
new file mode 100644
index 00000000000..66a19b0a8a0
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/SpillOutputStreamTest.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae;
+
+import org.apache.druid.query.groupby.GroupByStatsProvider;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Arrays;
+
+public class SpillOutputStreamTest
+{
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Test
+ public void testSmallWriteStaysInMemory() throws IOException
+ {
+ try (SpillOutputStream out = makeStream(1024)) {
+ out.write(new byte[]{1, 2, 3});
+ Assert.assertTrue(out.isInMemory());
+ Assert.assertArrayEquals(new byte[]{1, 2, 3}, out.toByteArray());
+ }
+ }
+
+ @Test
+ public void testExactlyAtThresholdStaysInMemory() throws IOException
+ {
+ try (SpillOutputStream out = makeStream(4)) {
+ out.write(new byte[]{1, 2, 3, 4});
+ Assert.assertTrue(out.isInMemory());
+ Assert.assertArrayEquals(new byte[]{1, 2, 3, 4}, out.toByteArray());
+ }
+ }
+
+ @Test
+ public void testExceedingThresholdSwitchesToDisk() throws IOException
+ {
+ try (SpillOutputStream out = makeStream(4)) {
+ out.write(new byte[]{1, 2, 3, 4, 5});
+ Assert.assertFalse(out.isInMemory());
+ Assert.assertTrue(out.getFile().exists());
+ byte[] fileContent = Files.readAllBytes(out.getFile().toPath());
+ Assert.assertArrayEquals(new byte[]{1, 2, 3, 4, 5}, fileContent);
+ }
+ }
+
+ @Test
+ public void testSwitchesToDiskOnSecondWrite() throws IOException
+ {
+ try (SpillOutputStream out = makeStream(4)) {
+ out.write(new byte[]{1, 2});
+ Assert.assertTrue(out.isInMemory());
+
+ out.write(new byte[]{3, 4, 5});
+ Assert.assertFalse(out.isInMemory());
+ byte[] fileContent = Files.readAllBytes(out.getFile().toPath());
+ Assert.assertArrayEquals(new byte[]{1, 2, 3, 4, 5}, fileContent);
+ }
+ }
+
+ @Test
+ public void testSingleByteWriteStaysInMemory() throws IOException
+ {
+ try (SpillOutputStream out = makeStream(1024)) {
+ out.write(42);
+ Assert.assertTrue(out.isInMemory());
+ Assert.assertArrayEquals(new byte[]{42}, out.toByteArray());
+ }
+ }
+
+ @Test
+ public void testSingleByteWriteTriggersSwitch() throws IOException
+ {
+ try (SpillOutputStream out = makeStream(2)) {
+ out.write(1);
+ out.write(2);
+ Assert.assertTrue(out.isInMemory());
+
+ out.write(3);
+ Assert.assertFalse(out.isInMemory());
+ byte[] fileContent = Files.readAllBytes(out.getFile().toPath());
+ Assert.assertArrayEquals(new byte[]{1, 2, 3}, fileContent);
+ }
+ }
+
+ @Test
+ public void testDataIntegrityAcrossSwitch() throws IOException
+ {
+ try (SpillOutputStream out = makeStream(10)) {
+ byte[] beforeSwitch = new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
+ byte[] afterSwitch = new byte[]{11, 12, 13, 14, 15};
+ out.write(beforeSwitch);
+ Assert.assertTrue(out.isInMemory());
+
+ out.write(afterSwitch);
+ Assert.assertFalse(out.isInMemory());
+ out.flush();
+
+ byte[] expected = new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13,
14, 15};
+ byte[] fileContent = Files.readAllBytes(out.getFile().toPath());
+ Assert.assertArrayEquals(expected, fileContent);
+ }
+ }
+
+ @Test
+ public void testWriteWithOffsetAndLength() throws IOException
+ {
+ try (SpillOutputStream out = makeStream(1024)) {
+ byte[] data = new byte[]{0, 0, 1, 2, 3, 0, 0};
+ out.write(data, 2, 3);
+ Assert.assertTrue(out.isInMemory());
+ Assert.assertArrayEquals(new byte[]{1, 2, 3}, out.toByteArray());
+ }
+ }
+
+ @Test
+ public void testWriteWithOffsetAndLengthTriggersDiskSwitch() throws
IOException
+ {
+ try (SpillOutputStream out = makeStream(2)) {
+ byte[] data = new byte[]{0, 1, 2, 3, 0};
+ out.write(data, 1, 3);
+ Assert.assertFalse(out.isInMemory());
+ byte[] fileContent = Files.readAllBytes(out.getFile().toPath());
+ Assert.assertArrayEquals(new byte[]{1, 2, 3}, fileContent);
+ }
+ }
+
+ @Test
+ public void testLargeWrite() throws IOException
+ {
+ try (SpillOutputStream out = makeStream(100)) {
+ byte[] data = new byte[10_000];
+ Arrays.fill(data, (byte) 0xAB);
+ out.write(data);
+ Assert.assertFalse(out.isInMemory());
+ out.flush();
+ byte[] fileContent = Files.readAllBytes(out.getFile().toPath());
+ Assert.assertArrayEquals(data, fileContent);
+ }
+ }
+
+ @Test
+ public void testZeroThresholdAlwaysGoesToDisk() throws IOException
+ {
+ try (SpillOutputStream out = makeStream(0)) {
+ out.write(new byte[]{1});
+ Assert.assertFalse(out.isInMemory());
+ byte[] fileContent = Files.readAllBytes(out.getFile().toPath());
+ Assert.assertArrayEquals(new byte[]{1}, fileContent);
+ }
+ }
+
+ @Test
+ public void testEmptyStreamIsInMemory() throws IOException
+ {
+ try (SpillOutputStream out = makeStream(1024)) {
+ Assert.assertTrue(out.isInMemory());
+ Assert.assertArrayEquals(new byte[0], out.toByteArray());
+ }
+ }
+
+ @Test
+ public void testMultipleWritesAccumulateInMemory() throws IOException
+ {
+ try (SpillOutputStream out = makeStream(1024)) {
+ out.write(new byte[]{1, 2});
+ out.write(new byte[]{3, 4});
+ out.write(5);
+ Assert.assertTrue(out.isInMemory());
+ Assert.assertArrayEquals(new byte[]{1, 2, 3, 4, 5}, out.toByteArray());
+ }
+ }
+
+ @Test
+ public void testMultipleWritesAfterDiskSwitch() throws IOException
+ {
+ try (SpillOutputStream out = makeStream(4)) {
+ out.write(new byte[]{1, 2, 3, 4, 5});
+ Assert.assertFalse(out.isInMemory());
+
+ out.write(new byte[]{6, 7});
+ out.write(8);
+ out.flush();
+
+ byte[] fileContent = Files.readAllBytes(out.getFile().toPath());
+ Assert.assertArrayEquals(new byte[]{1, 2, 3, 4, 5, 6, 7, 8},
fileContent);
+ }
+ }
+
+ @Test
+ public void testDiskStorageBytesTracked() throws IOException
+ {
+ LimitedTemporaryStorage storage = makeStorage(1024 * 1024);
+
+ try (SpillOutputStream out = new SpillOutputStream(storage, 4)) {
+ out.write(new byte[]{1, 2, 3, 4, 5});
+ Assert.assertFalse(out.isInMemory());
+ out.flush();
+ Assert.assertTrue(storage.currentSize() > 0);
+ }
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testToByteArrayThrowsAfterDiskSwitch() throws IOException
+ {
+ try (SpillOutputStream out = makeStream(4)) {
+ out.write(new byte[]{1, 2, 3, 4, 5});
+ Assert.assertFalse(out.isInMemory());
+ out.toByteArray();
+ }
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testGetFileThrowsWhenInMemory() throws IOException
+ {
+ try (SpillOutputStream out = makeStream(1024)) {
+ out.write(new byte[]{1, 2, 3});
+ Assert.assertTrue(out.isInMemory());
+ out.getFile();
+ }
+ }
+
+ @Test(expected = TemporaryStorageFullException.class)
+ public void testDiskStorageLimitEnforced() throws IOException
+ {
+ LimitedTemporaryStorage storage = makeStorage(10);
+
+ try (SpillOutputStream out = new SpillOutputStream(storage, 4)) {
+ byte[] data = new byte[100];
+ Arrays.fill(data, (byte) 1);
+ out.write(data);
+ }
+ }
+
+ private SpillOutputStream makeStream(long threshold) throws IOException
+ {
+ return new SpillOutputStream(makeStorage(1024 * 1024), threshold);
+ }
+
+ private LimitedTemporaryStorage makeStorage(long maxBytes) throws IOException
+ {
+ return new LimitedTemporaryStorage(
+ temporaryFolder.newFolder(),
+ maxBytes,
+ 100,
+ new GroupByStatsProvider.PerQueryStats()
+ );
+ }
+}
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouperTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouperTest.java
index 0fa77f988ad..3ec12156823 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouperTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouperTest.java
@@ -286,10 +286,43 @@ public class SpillingGrouperTest extends
InitializedNullHandlingTest
}
}
+ @Test
+ public void testSmallSpillsStayInMemoryUntilFlush() throws IOException
+ {
+ final File storageDir = temporaryFolder.newFolder();
+ final LimitedTemporaryStorage temporaryStorage =
+ new LimitedTemporaryStorage(storageDir, 1024 * 1024, 100, new
GroupByStatsProvider.PerQueryStats());
+
+ // Use a large minSpillFileSize so individual spills (which are tiny with
a 50-byte buffer)
+ // stay in memory via SpillOutputStream and never create temp files.
+ final long largeMinSpillFileSize = 1024 * 1024L;
+ try (SpillingGrouper<IntKey> grouper = makeGrouper(50, temporaryStorage,
largeMinSpillFileSize)) {
+ // Aggregate enough keys to trigger multiple spills, but not enough to
exceed
+ // minSpillFileSize in total pending bytes.
+ for (int i = 0; i < 20; i++) {
+ Assert.assertTrue(grouper.aggregate(new IntKey(i)).isOk());
+ }
+
+ // No files should have been created — all spills are below the
threshold and
+ // pending bytes haven't reached minSpillFileSize yet.
+ Assert.assertEquals(
+ "small spills should stay in memory without creating any temp files",
+ 0,
+ temporaryStorage.currentFileCount()
+ );
+ Assert.assertEquals(0, storageDir.listFiles().length);
+
+ // Results should still be correct when iterated.
+ assertResultsCorrect(grouper, 20, 1);
+ }
+ }
+
@Test
public void testDiskFull() throws IOException
{
- try (SpillingGrouper<IntKey> grouper = makeGrouper(50,
temporaryFolder.newFolder(), 10, 100)) {
+ // Use a small minSpillFileSize so pending runs flush to disk frequently,
where the
+ // 500-byte maxStorageBytes limit will be hit.
+ try (SpillingGrouper<IntKey> grouper = makeGrouper(50,
temporaryFolder.newFolder(), 500, 100, 100)) {
AggregateResult lastResult = AggregateResult.ok();
for (int i = 0; i < 10000 && lastResult.isOk(); i++) {
lastResult = grouper.aggregate(new IntKey(i));
@@ -345,10 +378,22 @@ public class SpillingGrouperTest extends
InitializedNullHandlingTest
long maxStorageBytes,
int maxFileCount
)
+ {
+ return makeGrouper(bufferSize, storageDir, maxStorageBytes, maxFileCount,
1024 * 1024L);
+ }
+
+ private SpillingGrouper<IntKey> makeGrouper(
+ int bufferSize,
+ File storageDir,
+ long maxStorageBytes,
+ int maxFileCount,
+ long minSpillFileSize
+ )
{
return makeGrouper(
bufferSize,
- new LimitedTemporaryStorage(storageDir, maxStorageBytes, maxFileCount,
new GroupByStatsProvider.PerQueryStats())
+ new LimitedTemporaryStorage(storageDir, maxStorageBytes, maxFileCount,
new GroupByStatsProvider.PerQueryStats()),
+ minSpillFileSize
);
}
@@ -356,6 +401,15 @@ public class SpillingGrouperTest extends
InitializedNullHandlingTest
int bufferSize,
LimitedTemporaryStorage temporaryStorage
)
+ {
+ return makeGrouper(bufferSize, temporaryStorage, 1024 * 1024L);
+ }
+
+ private SpillingGrouper<IntKey> makeGrouper(
+ int bufferSize,
+ LimitedTemporaryStorage temporaryStorage,
+ long minSpillFileSize
+ )
{
final GroupByTestColumnSelectorFactory columnSelectorFactory =
GrouperTestUtil.newColumnSelectorFactory();
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value",
1L)));
@@ -374,7 +428,7 @@ public class SpillingGrouperTest extends
InitializedNullHandlingTest
null,
false,
bufferSize,
- 1024 * 1024L,
+ minSpillFileSize,
new GroupByStatsProvider.PerQueryStats()
);
grouper.init();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]