This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1abe6aa0940321891233f60905a8152d9ca9c5e9
Author: Nico Kruber <[email protected]>
AuthorDate: Thu Sep 13 18:44:49 2018 +0200

    [hotfix][network][tests] add readView.nextBufferIsEvent to 
assertNextBufferOrEvent()
---
 .../network/partition/SpillableSubpartitionTest.java | 20 ++++++--------------
 .../io/network/partition/SubpartitionTestBase.java   |  2 ++
 2 files changed, 8 insertions(+), 14 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index 817795c..57d2cd6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -228,24 +228,20 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                SpilledSubpartitionView reader = (SpilledSubpartitionView) 
partition.createReadView(listener);
 
                assertEquals(1, listener.getNumNotifications());
-
                assertFalse(reader.nextBufferIsEvent()); // buffer
+
                assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, 
true);
                assertEquals(2, partition.getBuffersInBacklog());
 
-               assertFalse(reader.nextBufferIsEvent()); // buffer
                assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true);
                assertEquals(1, partition.getBuffersInBacklog());
 
-               assertTrue(reader.nextBufferIsEvent()); // event
                assertNextEvent(reader, eventSize, 
CancelCheckpointMarker.class, true, 1, false, true);
                assertEquals(1, partition.getBuffersInBacklog());
 
-               assertFalse(reader.nextBufferIsEvent()); // buffer
                assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true);
                assertEquals(0, partition.getBuffersInBacklog());
 
-               assertTrue(reader.nextBufferIsEvent()); // end of partition 
event
                assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, 
false, true);
                assertEquals(0, partition.getBuffersInBacklog());
 
@@ -314,24 +310,20 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                SpilledSubpartitionView reader = (SpilledSubpartitionView) 
partition.createReadView(listener);
 
                assertEquals(1, listener.getNumNotifications());
-
                assertFalse(reader.nextBufferIsEvent()); // full buffer
+
                assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, 
true);
                assertEquals(2, partition.getBuffersInBacklog());
 
-               assertFalse(reader.nextBufferIsEvent()); // full buffer
                assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true);
                assertEquals(1, partition.getBuffersInBacklog());
 
-               assertTrue(reader.nextBufferIsEvent()); // event
                assertNextEvent(reader, eventSize, 
CancelCheckpointMarker.class, true, 1, false, true);
                assertEquals(1, partition.getBuffersInBacklog());
 
-               assertFalse(reader.nextBufferIsEvent()); // partial buffer
                assertNextBuffer(reader, BUFFER_DATA_SIZE / 2, true, 0, true, 
true);
                assertEquals(0, partition.getBuffersInBacklog());
 
-               assertTrue(reader.nextBufferIsEvent()); // end of partition 
event
                assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, 
false, true);
                assertEquals(0, partition.getBuffersInBacklog());
 
@@ -370,6 +362,7 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                assertFalse(bufferConsumer.isRecycled());
 
                assertFalse(reader.nextBufferIsEvent());
+
                // first buffer (non-spilled)
                assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, 
false);
                assertEquals(BUFFER_DATA_SIZE, 
partition.getTotalNumberOfBytes()); // only updated when getting/spilling the 
buffers
@@ -397,19 +390,19 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                Buffer buffer = bufferConsumer.build();
                buffer.retainBuffer();
 
-               assertFalse(reader.nextBufferIsEvent()); // second buffer 
(retained in SpillableSubpartition#nextBuffer)
+               // second buffer (retained in SpillableSubpartition#nextBuffer)
                assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, 
false);
                assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, 
partition.getTotalNumberOfBytes()); // finally integrates the nextBuffer 
statistics
                assertEquals(1, partition.getBuffersInBacklog());
 
                bufferConsumer.close(); // recycle the retained buffer from 
above (should be the last reference!)
 
-               assertTrue(reader.nextBufferIsEvent()); // the event (spilled)
+               // the event (spilled)
                assertNextEvent(reader, eventSize, 
CancelCheckpointMarker.class, true, 1, false, true);
                assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, 
partition.getTotalNumberOfBytes()); // already updated during spilling
                assertEquals(1, partition.getBuffersInBacklog());
 
-               assertFalse(reader.nextBufferIsEvent()); // last buffer 
(spilled)
+               // last buffer (spilled)
                assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true);
                assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, 
partition.getTotalNumberOfBytes()); // already updated during spilling
                assertEquals(0, partition.getBuffersInBacklog());
@@ -418,7 +411,6 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                assertTrue(buffer.isRecycled());
 
                // End of partition
-               assertTrue(reader.nextBufferIsEvent());
                assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, 
false, true);
                assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, 
partition.getTotalNumberOfBytes()); // already updated during spilling
                assertEquals(0, partition.getBuffersInBacklog());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index 8c90215..5989cf8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -207,6 +207,8 @@ public abstract class SubpartitionTestBase extends 
TestLogger {
                        assertEquals("backlog", expectedBuffersInBacklog, 
bufferAndBacklog.buffersInBacklog());
                        assertEquals("next is event", expectedNextBufferIsEvent,
                                bufferAndBacklog.nextBufferIsEvent());
+                       assertEquals("next is event", expectedNextBufferIsEvent,
+                               readView.nextBufferIsEvent());
 
                        assertFalse("not recycled", 
bufferAndBacklog.buffer().isRecycled());
                } finally {

Reply via email to