[FLINK-8733][network] fix SpillableSubpartition#spillFinishedBufferConsumers() 
not counting spilled bytes

This closes #5549.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4bf76ae6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4bf76ae6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4bf76ae6

Branch: refs/heads/master
Commit: 4bf76ae69e3f22e25c2dad3e802be094554b5d43
Parents: 915213c
Author: Nico Kruber <[email protected]>
Authored: Tue Feb 20 18:04:12 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Tue Feb 27 09:07:13 2018 +0100

----------------------------------------------------------------------
 .../partition/SpillableSubpartition.java        |  5 ++++-
 .../partition/SpillableSubpartitionTest.java    | 21 ++++++++++++++++++++
 2 files changed, 25 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4bf76ae6/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 8758b34..6ac493e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -240,13 +241,15 @@ class SpillableSubpartition extends ResultSubpartition {
                return 0;
        }
 
-       private long spillFinishedBufferConsumers() throws IOException {
+       @VisibleForTesting
+       protected long spillFinishedBufferConsumers() throws IOException {
                long spilledBytes = 0;
 
                while (!buffers.isEmpty()) {
                        BufferConsumer bufferConsumer = buffers.peek();
                        Buffer buffer = bufferConsumer.build();
                        updateStatistics(buffer);
+                       spilledBytes += buffer.getSize();
                        spillWriter.writeBlock(buffer);
 
                        if (bufferConsumer.isFinished()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4bf76ae6/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index 9dc7bed..65d98e6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsyncWithNoOpBufferFileWriter;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
@@ -40,12 +41,14 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -701,6 +704,24 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
        }
 
        /**
+        * Tests {@link SpillableSubpartition#spillFinishedBufferConsumers()} 
spilled bytes counting.
+        */
+       @Test
+       public void testSpillFinishedBufferConsumers() throws Exception {
+               SpillableSubpartition partition = createSubpartition();
+               BufferBuilder bufferBuilder = 
createBufferBuilder(BUFFER_DATA_SIZE);
+
+               try (BufferConsumer buffer = 
bufferBuilder.createBufferConsumer()) {
+                       partition.add(buffer);
+                       assertEquals(0, partition.releaseMemory());
+                       // finally fill the buffer with some bytes
+                       
bufferBuilder.appendAndCommit(ByteBuffer.allocate(BUFFER_DATA_SIZE));
+                       bufferBuilder.finish(); // so that this buffer can be 
removed from the queue
+                       assertEquals(BUFFER_DATA_SIZE, 
partition.spillFinishedBufferConsumers());
+               }
+       }
+
+       /**
         * An {@link IOManagerAsync} that creates closed {@link 
BufferFileWriter} instances in its
         * {@link #createBufferFileWriter(FileIOChannel.ID)} method.
         *

Reply via email to