This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch release-1.6 in repository https://gitbox.apache.org/repos/asf/flink.git
commit e018c6968a3d5c2cda84bdea08bf3eb8e9e4a1c9 Author: Nico Kruber <[email protected]> AuthorDate: Thu Sep 13 18:44:49 2018 +0200 [hotfix][network][tests] add readView.nextBufferIsEvent to assertNextBufferOrEvent() --- .../network/partition/SpillableSubpartitionTest.java | 20 ++++++-------------- .../io/network/partition/SubpartitionTestBase.java | 2 ++ 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java index 817795c..57d2cd6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java @@ -228,24 +228,20 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener); assertEquals(1, listener.getNumNotifications()); - assertFalse(reader.nextBufferIsEvent()); // buffer + assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, true); assertEquals(2, partition.getBuffersInBacklog()); - assertFalse(reader.nextBufferIsEvent()); // buffer assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true); assertEquals(1, partition.getBuffersInBacklog()); - assertTrue(reader.nextBufferIsEvent()); // event assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true); assertEquals(1, partition.getBuffersInBacklog()); - assertFalse(reader.nextBufferIsEvent()); // buffer assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true); assertEquals(0, partition.getBuffersInBacklog()); - assertTrue(reader.nextBufferIsEvent()); // end of partition event assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true); assertEquals(0, partition.getBuffersInBacklog()); @@ -314,24 +310,20 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener); assertEquals(1, listener.getNumNotifications()); - assertFalse(reader.nextBufferIsEvent()); // full buffer + assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, true); assertEquals(2, partition.getBuffersInBacklog()); - assertFalse(reader.nextBufferIsEvent()); // full buffer assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true); assertEquals(1, partition.getBuffersInBacklog()); - assertTrue(reader.nextBufferIsEvent()); // event assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true); assertEquals(1, partition.getBuffersInBacklog()); - assertFalse(reader.nextBufferIsEvent()); // partial buffer assertNextBuffer(reader, BUFFER_DATA_SIZE / 2, true, 0, true, true); assertEquals(0, partition.getBuffersInBacklog()); - assertTrue(reader.nextBufferIsEvent()); // end of partition event assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true); assertEquals(0, partition.getBuffersInBacklog()); @@ -370,6 +362,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { assertFalse(bufferConsumer.isRecycled()); assertFalse(reader.nextBufferIsEvent()); + // first buffer (non-spilled) assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, false); assertEquals(BUFFER_DATA_SIZE, partition.getTotalNumberOfBytes()); // only updated when getting/spilling the buffers @@ -397,19 +390,19 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { Buffer buffer = bufferConsumer.build(); buffer.retainBuffer(); - assertFalse(reader.nextBufferIsEvent()); // second buffer (retained in SpillableSubpartition#nextBuffer) + // second buffer (retained in SpillableSubpartition#nextBuffer) assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, false); assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // finally integrates the nextBuffer statistics assertEquals(1, partition.getBuffersInBacklog()); bufferConsumer.close(); // recycle the retained buffer from above (should be the last reference!) - assertTrue(reader.nextBufferIsEvent()); // the event (spilled) + // the event (spilled) assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true); assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling assertEquals(1, partition.getBuffersInBacklog()); - assertFalse(reader.nextBufferIsEvent()); // last buffer (spilled) + // last buffer (spilled) assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true); assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling assertEquals(0, partition.getBuffersInBacklog()); @@ -418,7 +411,6 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { assertTrue(buffer.isRecycled()); // End of partition - assertTrue(reader.nextBufferIsEvent()); assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true); assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling assertEquals(0, partition.getBuffersInBacklog()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java index 8c90215..5989cf8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java @@ -207,6 +207,8 @@ public abstract class SubpartitionTestBase extends TestLogger { assertEquals("backlog", expectedBuffersInBacklog, bufferAndBacklog.buffersInBacklog()); assertEquals("next is event", expectedNextBufferIsEvent, bufferAndBacklog.nextBufferIsEvent()); + assertEquals("next is event", expectedNextBufferIsEvent, + readView.nextBufferIsEvent()); assertFalse("not recycled", bufferAndBacklog.buffer().isRecycled()); } finally {
