Repository: flink
Updated Branches:
  refs/heads/master 915213c7a -> 9fb1c23aa


[FLINK-8734][network] fix partition bytes counting and re-enable in tests

This closes #5550.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f9daf9cc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f9daf9cc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f9daf9cc

Branch: refs/heads/master
Commit: f9daf9cc4243a80b38a1f81bf2b9b37565fe2d61
Parents: 4bf76ae
Author: Nico Kruber <[email protected]>
Authored: Tue Feb 20 18:05:54 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Tue Feb 27 09:07:13 2018 +0100

----------------------------------------------------------------------
 .../partition/SpillableSubpartitionView.java    |  7 +++-
 .../partition/PipelinedSubpartitionTest.java    | 16 ++++----
 .../partition/SpillableSubpartitionTest.java    | 41 ++++++++++++--------
 .../network/partition/SubpartitionTestBase.java |  3 ++
 4 files changed, 42 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f9daf9cc/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index 789b3d0..b821dcf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -18,15 +18,17 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
-import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -115,6 +117,7 @@ class SpillableSubpartitionView implements 
ResultSubpartitionView {
                                                
checkState(bufferConsumer.isFinished(), "BufferConsumer must be finished before 
" +
                                                        "spilling. Otherwise we 
would not be able to simply remove it from the queue. This should " +
                                                        "be guaranteed by 
creating ResultSubpartitionView only once Subpartition isFinished.");
+                                               parent.updateStatistics(buffer);
                                                spilledBytes += 
buffer.getSize();
                                                spillWriter.writeBlock(buffer);
                                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f9daf9cc/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 2ca01c8..528f0e2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -207,8 +207,7 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
 
                assertEquals(1, subpartition.getTotalNumberOfBuffers());
                assertEquals(1, subpartition.getBuffersInBacklog());
-               // TODO: re-enable?
-//             assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
+               assertEquals(0, subpartition.getTotalNumberOfBytes()); // only 
updated when getting the buffer
 
                // ...should have resulted in a notification
                verify(listener, times(1)).notifyDataAvailable();
@@ -218,6 +217,7 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                BufferAndBacklog read = view.getNextBuffer();
                assertNotNull(read);
                assertTrue(read.buffer().isBuffer());
+               assertEquals(BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
                assertEquals(0, subpartition.getBuffersInBacklog());
                assertEquals(subpartition.getBuffersInBacklog(), 
read.buffersInBacklog());
                assertFalse(read.nextBufferIsEvent());
@@ -231,14 +231,14 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
 
                assertEquals(2, subpartition.getTotalNumberOfBuffers());
                assertEquals(1, subpartition.getBuffersInBacklog());
-               // TODO: re-enable?
-//             assertEquals(2 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes());
+               assertEquals(BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
                verify(listener, times(2)).notifyDataAvailable();
 
                assertFalse(view.nextBufferIsEvent());
                read = view.getNextBuffer();
                assertNotNull(read);
                assertTrue(read.buffer().isBuffer());
+               assertEquals(2 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
                assertEquals(0, subpartition.getBuffersInBacklog());
                assertEquals(subpartition.getBuffersInBacklog(), 
read.buffersInBacklog());
                assertFalse(read.nextBufferIsEvent());
@@ -258,14 +258,14 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
 
                assertEquals(5, subpartition.getTotalNumberOfBuffers());
                assertEquals(2, subpartition.getBuffersInBacklog()); // two 
buffers (events don't count)
-               // TODO: re-enable?
-//             assertEquals(5 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes());
+               assertEquals(2 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
                verify(listener, times(4)).notifyDataAvailable();
 
                assertFalse(view.nextBufferIsEvent()); // the first buffer
                read = view.getNextBuffer();
                assertNotNull(read);
                assertTrue(read.buffer().isBuffer());
+               assertEquals(3 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
                assertEquals(1, subpartition.getBuffersInBacklog());
                assertEquals(subpartition.getBuffersInBacklog(), 
read.buffersInBacklog());
                assertTrue(read.nextBufferIsEvent());
@@ -274,6 +274,7 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                read = view.getNextBuffer();
                assertNotNull(read);
                assertFalse(read.buffer().isBuffer());
+               assertEquals(4 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
                assertEquals(1, subpartition.getBuffersInBacklog());
                assertEquals(subpartition.getBuffersInBacklog(), 
read.buffersInBacklog());
                assertFalse(read.nextBufferIsEvent());
@@ -282,6 +283,7 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                read = view.getNextBuffer();
                assertNotNull(read);
                assertTrue(read.buffer().isBuffer());
+               assertEquals(5 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
                assertEquals(0, subpartition.getBuffersInBacklog());
                assertEquals(subpartition.getBuffersInBacklog(), 
read.buffersInBacklog());
                assertFalse(read.nextBufferIsEvent());
@@ -473,6 +475,6 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                        Assert.fail("buffer 2 not recycled");
                }
                assertEquals(2, partition.getTotalNumberOfBuffers());
-               //assertEquals(2 * 4096, partition.getTotalNumberOfBytes());
+               assertEquals(0, partition.getTotalNumberOfBytes()); // buffer 
data is never consumed
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f9daf9cc/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index 65d98e6..43bcd31 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -199,8 +199,7 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
 
                assertEquals(4, partition.getTotalNumberOfBuffers());
                assertEquals(3, partition.getBuffersInBacklog());
-               //TODO: re-enable this?
-//             assertEquals(BUFFER_DATA_SIZE * 4, 
partition.getTotalNumberOfBytes());
+               assertEquals(0, partition.getTotalNumberOfBytes()); // only 
updated when getting/releasing the buffers
 
                assertFalse(bufferConsumer.isRecycled());
                assertEquals(4, partition.releaseMemory());
@@ -305,8 +304,7 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
 
                assertEquals(5, partition.getTotalNumberOfBuffers());
                assertEquals(3, partition.getBuffersInBacklog());
-               //TODO: re-enable this?
-//             assertEquals(BUFFER_DATA_SIZE * 4 + 4, 
partition.getTotalNumberOfBytes());
+               assertEquals(0, partition.getTotalNumberOfBytes()); // only 
updated when getting/spilling the buffers
 
                AwaitableBufferAvailablityListener listener = new 
AwaitableBufferAvailablityListener();
                SpillableSubpartitionView reader = (SpillableSubpartitionView) 
partition.createReadView(listener);
@@ -319,6 +317,7 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                BufferAndBacklog read = reader.getNextBuffer(); // first buffer 
(non-spilled)
                assertNotNull(read);
                assertTrue(read.buffer().isBuffer());
+               assertEquals(BUFFER_DATA_SIZE, 
partition.getTotalNumberOfBytes()); // only updated when getting/spilling the 
buffers
                assertEquals(2, partition.getBuffersInBacklog());
                assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
                read.buffer().recycleBuffer();
@@ -332,8 +331,8 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                // still same statistics:
                assertEquals(5, partition.getTotalNumberOfBuffers());
                assertEquals(2, partition.getBuffersInBacklog());
-               //TODO: re-enable this?
-//             assertEquals(BUFFER_DATA_SIZE * 4 + 4, 
partition.getTotalNumberOfBytes());
+               // only updated when getting/spilling the buffers but without 
the nextBuffer (kept in memory)
+               assertEquals(BUFFER_DATA_SIZE * 3 + 4, 
partition.getTotalNumberOfBytes());
 
                listener.awaitNotifications(3, 30_000);
                assertEquals(3, listener.getNumNotifications());
@@ -342,6 +341,7 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                read = reader.getNextBuffer();
                assertNotNull(read);
                assertTrue(read.buffer().isBuffer());
+               assertEquals(BUFFER_DATA_SIZE * 4 + 4, 
partition.getTotalNumberOfBytes()); // finally integrates the nextBuffer 
statistics
                assertEquals(1, partition.getBuffersInBacklog());
                assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
                read.buffer().recycleBuffer();
@@ -353,6 +353,7 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                read = reader.getNextBuffer();
                assertNotNull(read);
                assertFalse(read.buffer().isBuffer());
+               assertEquals(BUFFER_DATA_SIZE * 4 + 4, 
partition.getTotalNumberOfBytes()); // already updated during spilling
                assertEquals(1, partition.getBuffersInBacklog());
                assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
                read.buffer().recycleBuffer();
@@ -362,6 +363,7 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                read = reader.getNextBuffer();
                assertNotNull(read);
                assertTrue(read.buffer().isBuffer());
+               assertEquals(BUFFER_DATA_SIZE * 4 + 4, 
partition.getTotalNumberOfBytes()); // already updated during spilling
                assertEquals(0, partition.getBuffersInBacklog());
                assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
                assertFalse(read.buffer().isRecycled());
@@ -373,6 +375,7 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                assertTrue(reader.nextBufferIsEvent());
                read = reader.getNextBuffer();
                assertNotNull(read);
+               assertEquals(BUFFER_DATA_SIZE * 4 + 4, 
partition.getTotalNumberOfBytes()); // already updated during spilling
                assertEquals(0, partition.getBuffersInBacklog());
                assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
                assertEquals(EndOfPartitionEvent.class,
@@ -421,8 +424,8 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                partition.finish();
                // finish adds an EndOfPartitionEvent
                assertEquals(1, partition.getTotalNumberOfBuffers());
-               //TODO: re-enable this?
-//             assertEquals(4, partition.getTotalNumberOfBytes());
+               // if not spilled, statistics are only updated when consuming 
the buffers
+               assertEquals(spilled ? 4 : 0, 
partition.getTotalNumberOfBytes());
 
                BufferConsumer buffer = 
createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
                try {
@@ -435,8 +438,8 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                }
                // still same statistics
                assertEquals(1, partition.getTotalNumberOfBuffers());
-               //TODO: re-enable this?
-//             assertEquals(4, partition.getTotalNumberOfBytes());
+               // if not spilled, statistics are only updated when consuming 
the buffers
+               assertEquals(spilled ? 4 : 0, 
partition.getTotalNumberOfBytes());
        }
 
        @Test
@@ -546,13 +549,13 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                        assertFalse("buffer1 should not be recycled (still in 
the queue)", buffer1.isRecycled());
                        assertFalse("buffer2 should not be recycled (still in 
the queue)", buffer2.isRecycled());
                        assertEquals(2, partition.getTotalNumberOfBuffers());
-                       //TODO: re-enable this?
-//                     assertEquals(BUFFER_DATA_SIZE * 2, 
partition.getTotalNumberOfBytes());
+                       assertEquals(0, partition.getTotalNumberOfBytes()); // 
only updated when buffers are consumed or spilled
 
                        if (createView) {
                                // Create a read view
                                partition.finish();
                                partition.createReadView(new 
NoOpBufferAvailablityListener());
+                               assertEquals(0, 
partition.getTotalNumberOfBytes()); // only updated when buffers are consumed 
or spilled
                        }
 
                        // one instance of the buffers is placed in the view's 
nextBuffer and not released
@@ -571,8 +574,8 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                }
                // note: a view requires a finished partition which has an 
additional EndOfPartitionEvent
                assertEquals(2 + (createView ? 1 : 0), 
partition.getTotalNumberOfBuffers());
-               //TODO: re-enable this?
-//             assertEquals(BUFFER_DATA_SIZE * 2 + (createView ? 4 : 0), 
partition.getTotalNumberOfBytes());
+               // with a view, one buffer remains in nextBuffer and is not 
counted yet
+               assertEquals(BUFFER_DATA_SIZE + (createView ? 4 : 
BUFFER_DATA_SIZE), partition.getTotalNumberOfBytes());
        }
 
        /**
@@ -699,8 +702,14 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                }
                // note: in case we create a view, there will be an additional 
EndOfPartitionEvent
                assertEquals(createView ? 3 : 2, 
partition.getTotalNumberOfBuffers());
-               //TODO: re-enable this?
-//             assertEquals((createView ? 4 : 0) + 2 * BUFFER_DATA_SIZE, 
partition.getTotalNumberOfBytes());
+               if (spilled) {
+                       // with a view, one buffer remains in nextBuffer and is 
not counted yet
+                       assertEquals(BUFFER_DATA_SIZE + (createView ? 4 : 
BUFFER_DATA_SIZE),
+                               partition.getTotalNumberOfBytes());
+               } else {
+                       // non-spilled byte statistics are only updated when 
buffers are consumed
+                       assertEquals(0, partition.getTotalNumberOfBytes());
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/f9daf9cc/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index 48846b6..1b861df 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -52,15 +52,18 @@ public abstract class SubpartitionTestBase extends 
TestLogger {
                try {
                        subpartition.finish();
                        assertEquals(1, subpartition.getTotalNumberOfBuffers());
+                       assertEquals(0, subpartition.getTotalNumberOfBytes()); 
// only updated after consuming the buffers
 
                        assertEquals(1, subpartition.getTotalNumberOfBuffers());
                        assertEquals(0, subpartition.getBuffersInBacklog());
+                       assertEquals(0, subpartition.getTotalNumberOfBytes()); 
// only updated after consuming the buffers
 
                        BufferConsumer bufferConsumer = 
createFilledBufferConsumer(4096, 4096);
 
                        assertFalse(subpartition.add(bufferConsumer));
                        assertEquals(1, subpartition.getTotalNumberOfBuffers());
                        assertEquals(0, subpartition.getBuffersInBacklog());
+                       assertEquals(0, subpartition.getTotalNumberOfBytes()); 
// only updated after consuming the buffers
                } finally {
                        if (subpartition != null) {
                                subpartition.release();

Reply via email to