Repository: flink Updated Branches: refs/heads/master 915213c7a -> 9fb1c23aa
[FLINK-8734][network] fix partition bytes counting and re-enable in tests This closes #5550. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f9daf9cc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f9daf9cc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f9daf9cc Branch: refs/heads/master Commit: f9daf9cc4243a80b38a1f81bf2b9b37565fe2d61 Parents: 4bf76ae Author: Nico Kruber <[email protected]> Authored: Tue Feb 20 18:05:54 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Tue Feb 27 09:07:13 2018 +0100 ---------------------------------------------------------------------- .../partition/SpillableSubpartitionView.java | 7 +++- .../partition/PipelinedSubpartitionTest.java | 16 ++++---- .../partition/SpillableSubpartitionTest.java | 41 ++++++++++++-------- .../network/partition/SubpartitionTestBase.java | 3 ++ 4 files changed, 42 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f9daf9cc/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java index 789b3d0..b821dcf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java @@ -18,15 +18,17 @@ package org.apache.flink.runtime.io.network.partition; -import org.apache.flink.runtime.io.network.buffer.BufferConsumer; -import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferConsumer; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayDeque; import java.util.concurrent.atomic.AtomicBoolean; @@ -115,6 +117,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView { checkState(bufferConsumer.isFinished(), "BufferConsumer must be finished before " + "spilling. Otherwise we would not be able to simply remove it from the queue. This should " + "be guaranteed by creating ResultSubpartitionView only once Subpartition isFinished."); + parent.updateStatistics(buffer); spilledBytes += buffer.getSize(); spillWriter.writeBlock(buffer); } http://git-wip-us.apache.org/repos/asf/flink/blob/f9daf9cc/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java index 2ca01c8..528f0e2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java @@ -207,8 +207,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { assertEquals(1, subpartition.getTotalNumberOfBuffers()); assertEquals(1, subpartition.getBuffersInBacklog()); - // TODO: re-enable? -// assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); + assertEquals(0, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer // ...should have resulted in a notification verify(listener, times(1)).notifyDataAvailable(); @@ -218,6 +217,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { BufferAndBacklog read = view.getNextBuffer(); assertNotNull(read); assertTrue(read.buffer().isBuffer()); + assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer assertEquals(0, subpartition.getBuffersInBacklog()); assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog()); assertFalse(read.nextBufferIsEvent()); @@ -231,14 +231,14 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { assertEquals(2, subpartition.getTotalNumberOfBuffers()); assertEquals(1, subpartition.getBuffersInBacklog()); - // TODO: re-enable? -// assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); + assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer verify(listener, times(2)).notifyDataAvailable(); assertFalse(view.nextBufferIsEvent()); read = view.getNextBuffer(); assertNotNull(read); assertTrue(read.buffer().isBuffer()); + assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer assertEquals(0, subpartition.getBuffersInBacklog()); assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog()); assertFalse(read.nextBufferIsEvent()); @@ -258,14 +258,14 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { assertEquals(5, subpartition.getTotalNumberOfBuffers()); assertEquals(2, subpartition.getBuffersInBacklog()); // two buffers (events don't count) - // TODO: re-enable? -// assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); + assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer verify(listener, times(4)).notifyDataAvailable(); assertFalse(view.nextBufferIsEvent()); // the first buffer read = view.getNextBuffer(); assertNotNull(read); assertTrue(read.buffer().isBuffer()); + assertEquals(3 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer assertEquals(1, subpartition.getBuffersInBacklog()); assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog()); assertTrue(read.nextBufferIsEvent()); @@ -274,6 +274,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { read = view.getNextBuffer(); assertNotNull(read); assertFalse(read.buffer().isBuffer()); + assertEquals(4 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer assertEquals(1, subpartition.getBuffersInBacklog()); assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog()); assertFalse(read.nextBufferIsEvent()); @@ -282,6 +283,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { read = view.getNextBuffer(); assertNotNull(read); assertTrue(read.buffer().isBuffer()); + assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer assertEquals(0, subpartition.getBuffersInBacklog()); assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog()); assertFalse(read.nextBufferIsEvent()); @@ -473,6 +475,6 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { Assert.fail("buffer 2 not recycled"); } assertEquals(2, partition.getTotalNumberOfBuffers()); - //assertEquals(2 * 4096, partition.getTotalNumberOfBytes()); + assertEquals(0, partition.getTotalNumberOfBytes()); // buffer data is never consumed } } http://git-wip-us.apache.org/repos/asf/flink/blob/f9daf9cc/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java index 65d98e6..43bcd31 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java @@ -199,8 +199,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { assertEquals(4, partition.getTotalNumberOfBuffers()); assertEquals(3, partition.getBuffersInBacklog()); - //TODO: re-enable this? -// assertEquals(BUFFER_DATA_SIZE * 4, partition.getTotalNumberOfBytes()); + assertEquals(0, partition.getTotalNumberOfBytes()); // only updated when getting/releasing the buffers assertFalse(bufferConsumer.isRecycled()); assertEquals(4, partition.releaseMemory()); @@ -305,8 +304,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { assertEquals(5, partition.getTotalNumberOfBuffers()); assertEquals(3, partition.getBuffersInBacklog()); - //TODO: re-enable this? -// assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); + assertEquals(0, partition.getTotalNumberOfBytes()); // only updated when getting/spilling the buffers AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener(); SpillableSubpartitionView reader = (SpillableSubpartitionView) partition.createReadView(listener); @@ -319,6 +317,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { BufferAndBacklog read = reader.getNextBuffer(); // first buffer (non-spilled) assertNotNull(read); assertTrue(read.buffer().isBuffer()); + assertEquals(BUFFER_DATA_SIZE, partition.getTotalNumberOfBytes()); // only updated when getting/spilling the buffers assertEquals(2, partition.getBuffersInBacklog()); assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); read.buffer().recycleBuffer(); @@ -332,8 +331,8 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { // still same statistics: assertEquals(5, partition.getTotalNumberOfBuffers()); assertEquals(2, partition.getBuffersInBacklog()); - //TODO: re-enable this? -// assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); + // only updated when getting/spilling the buffers but without the nextBuffer (kept in memory) + assertEquals(BUFFER_DATA_SIZE * 3 + 4, partition.getTotalNumberOfBytes()); listener.awaitNotifications(3, 30_000); assertEquals(3, listener.getNumNotifications()); @@ -342,6 +341,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { read = reader.getNextBuffer(); assertNotNull(read); assertTrue(read.buffer().isBuffer()); + assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // finally integrates the nextBuffer statistics assertEquals(1, partition.getBuffersInBacklog()); assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); read.buffer().recycleBuffer(); @@ -353,6 +353,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { read = reader.getNextBuffer(); assertNotNull(read); assertFalse(read.buffer().isBuffer()); + assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // already updated during spilling assertEquals(1, partition.getBuffersInBacklog()); assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); read.buffer().recycleBuffer(); @@ -362,6 +363,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { read = reader.getNextBuffer(); assertNotNull(read); assertTrue(read.buffer().isBuffer()); + assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // already updated during spilling assertEquals(0, partition.getBuffersInBacklog()); assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); assertFalse(read.buffer().isRecycled()); @@ -373,6 +375,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { assertTrue(reader.nextBufferIsEvent()); read = reader.getNextBuffer(); assertNotNull(read); + assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // already updated during spilling assertEquals(0, partition.getBuffersInBacklog()); assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); assertEquals(EndOfPartitionEvent.class, @@ -421,8 +424,8 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { partition.finish(); // finish adds an EndOfPartitionEvent assertEquals(1, partition.getTotalNumberOfBuffers()); - //TODO: re-enable this? -// assertEquals(4, partition.getTotalNumberOfBytes()); + // if not spilled, statistics are only updated when consuming the buffers + assertEquals(spilled ? 4 : 0, partition.getTotalNumberOfBytes()); BufferConsumer buffer = createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE); try { @@ -435,8 +438,8 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { } // still same statistics assertEquals(1, partition.getTotalNumberOfBuffers()); - //TODO: re-enable this? -// assertEquals(4, partition.getTotalNumberOfBytes()); + // if not spilled, statistics are only updated when consuming the buffers + assertEquals(spilled ? 4 : 0, partition.getTotalNumberOfBytes()); } @Test @@ -546,13 +549,13 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { assertFalse("buffer1 should not be recycled (still in the queue)", buffer1.isRecycled()); assertFalse("buffer2 should not be recycled (still in the queue)", buffer2.isRecycled()); assertEquals(2, partition.getTotalNumberOfBuffers()); - //TODO: re-enable this? -// assertEquals(BUFFER_DATA_SIZE * 2, partition.getTotalNumberOfBytes()); + assertEquals(0, partition.getTotalNumberOfBytes()); // only updated when buffers are consumed or spilled if (createView) { // Create a read view partition.finish(); partition.createReadView(new NoOpBufferAvailablityListener()); + assertEquals(0, partition.getTotalNumberOfBytes()); // only updated when buffers are consumed or spilled } // one instance of the buffers is placed in the view's nextBuffer and not released @@ -571,8 +574,8 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { } // note: a view requires a finished partition which has an additional EndOfPartitionEvent assertEquals(2 + (createView ? 1 : 0), partition.getTotalNumberOfBuffers()); - //TODO: re-enable this? -// assertEquals(BUFFER_DATA_SIZE * 2 + (createView ? 4 : 0), partition.getTotalNumberOfBytes()); + // with a view, one buffer remains in nextBuffer and is not counted yet + assertEquals(BUFFER_DATA_SIZE + (createView ? 4 : BUFFER_DATA_SIZE), partition.getTotalNumberOfBytes()); } /** @@ -699,8 +702,14 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { } // note: in case we create a view, there will be an additional EndOfPartitionEvent assertEquals(createView ? 3 : 2, partition.getTotalNumberOfBuffers()); - //TODO: re-enable this? -// assertEquals((createView ? 4 : 0) + 2 * BUFFER_DATA_SIZE, partition.getTotalNumberOfBytes()); + if (spilled) { + // with a view, one buffer remains in nextBuffer and is not counted yet + assertEquals(BUFFER_DATA_SIZE + (createView ? 4 : BUFFER_DATA_SIZE), + partition.getTotalNumberOfBytes()); + } else { + // non-spilled byte statistics are only updated when buffers are consumed + assertEquals(0, partition.getTotalNumberOfBytes()); + } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/f9daf9cc/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java index 48846b6..1b861df 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java @@ -52,15 +52,18 @@ public abstract class SubpartitionTestBase extends TestLogger { try { subpartition.finish(); assertEquals(1, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getTotalNumberOfBytes()); // only updated after consuming the buffers assertEquals(1, subpartition.getTotalNumberOfBuffers()); assertEquals(0, subpartition.getBuffersInBacklog()); + assertEquals(0, subpartition.getTotalNumberOfBytes()); // only updated after consuming the buffers BufferConsumer bufferConsumer = createFilledBufferConsumer(4096, 4096); assertFalse(subpartition.add(bufferConsumer)); assertEquals(1, subpartition.getTotalNumberOfBuffers()); assertEquals(0, subpartition.getBuffersInBacklog()); + assertEquals(0, subpartition.getTotalNumberOfBytes()); // only updated after consuming the buffers } finally { if (subpartition != null) { subpartition.release();
