[FLINK-8733][network] fix SpillableSubpartition#spillFinishedBufferConsumers() not counting spilled bytes
This closes #5549. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4bf76ae6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4bf76ae6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4bf76ae6 Branch: refs/heads/master Commit: 4bf76ae69e3f22e25c2dad3e802be094554b5d43 Parents: 915213c Author: Nico Kruber <[email protected]> Authored: Tue Feb 20 18:04:12 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Tue Feb 27 09:07:13 2018 +0100 ---------------------------------------------------------------------- .../partition/SpillableSubpartition.java | 5 ++++- .../partition/SpillableSubpartitionTest.java | 21 ++++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4bf76ae6/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java index 8758b34..6ac493e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -240,13 +241,15 @@ class SpillableSubpartition extends ResultSubpartition { return 0; } - private long spillFinishedBufferConsumers() throws IOException { + @VisibleForTesting + protected long spillFinishedBufferConsumers() throws IOException { long spilledBytes = 0; while (!buffers.isEmpty()) { BufferConsumer bufferConsumer = buffers.peek(); Buffer buffer = bufferConsumer.build(); updateStatistics(buffer); + spilledBytes += buffer.getSize(); spillWriter.writeBlock(buffer); if (bufferConsumer.isFinished()) { http://git-wip-us.apache.org/repos/asf/flink/blob/4bf76ae6/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java index 9dc7bed..65d98e6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsyncWithNoOpBufferFileWriter; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; @@ -40,12 +41,14 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -701,6 +704,24 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { } /** + * Tests {@link SpillableSubpartition#spillFinishedBufferConsumers()} spilled bytes counting. + */ + @Test + public void testSpillFinishedBufferConsumers() throws Exception { + SpillableSubpartition partition = createSubpartition(); + BufferBuilder bufferBuilder = createBufferBuilder(BUFFER_DATA_SIZE); + + try (BufferConsumer buffer = bufferBuilder.createBufferConsumer()) { + partition.add(buffer); + assertEquals(0, partition.releaseMemory()); + // finally fill the buffer with some bytes + bufferBuilder.appendAndCommit(ByteBuffer.allocate(BUFFER_DATA_SIZE)); + bufferBuilder.finish(); // so that this buffer can be removed from the queue + assertEquals(BUFFER_DATA_SIZE, partition.spillFinishedBufferConsumers()); + } + } + + /** * An {@link IOManagerAsync} that creates closed {@link BufferFileWriter} instances in its * {@link #createBufferFileWriter(FileIOChannel.ID)} method. *
