This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a8ed18a6d2247ba87064e3c8d49d5cb6a10df45c
Author: Nico Kruber <[email protected]>
AuthorDate: Thu Sep 13 18:47:25 2018 +0200

    [hotfix][network][tests] use assertNextBuffer etc in 
PipelinedSubpartitionTest
---
 .../partition/PipelinedSubpartitionTest.java       | 53 +++++++---------------
 1 file changed, 16 insertions(+), 37 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index fc9a643..90bdb82 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestProducerSource;
@@ -254,7 +253,7 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
 
                // Empty => should return null
                assertFalse(view.nextBufferIsEvent());
-               assertNull(view.getNextBuffer());
+               assertNoNextBuffer(view);
                assertFalse(view.nextBufferIsEvent()); // also after 
getNextBuffer()
                verify(listener, times(0)).notifyDataAvailable();
 
@@ -270,16 +269,10 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                verify(listener, times(1)).notifyDataAvailable();
 
                // ...and one available result
-               assertFalse(view.nextBufferIsEvent());
-               BufferAndBacklog read = view.getNextBuffer();
-               assertNotNull(read);
-               assertTrue(read.buffer().isBuffer());
+               assertNextBuffer(view, BUFFER_SIZE, false, 
subpartition.getBuffersInBacklog() - 1, false, true);
                assertEquals(BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
                assertEquals(0, subpartition.getBuffersInBacklog());
-               assertEquals(subpartition.getBuffersInBacklog(), 
read.buffersInBacklog());
-               assertFalse(read.nextBufferIsEvent());
-               assertFalse(view.nextBufferIsEvent());
-               assertNull(view.getNextBuffer());
+               assertNoNextBuffer(view);
                assertEquals(0, subpartition.getBuffersInBacklog());
 
                // Add data to the queue...
@@ -291,21 +284,15 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                assertEquals(BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
                verify(listener, times(2)).notifyDataAvailable();
 
-               assertFalse(view.nextBufferIsEvent());
-               read = view.getNextBuffer();
-               assertNotNull(read);
-               assertTrue(read.buffer().isBuffer());
+               assertNextBuffer(view, BUFFER_SIZE, false, 
subpartition.getBuffersInBacklog() - 1, false, true);
                assertEquals(2 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
                assertEquals(0, subpartition.getBuffersInBacklog());
-               assertEquals(subpartition.getBuffersInBacklog(), 
read.buffersInBacklog());
-               assertFalse(read.nextBufferIsEvent());
-               assertFalse(view.nextBufferIsEvent());
-               assertNull(view.getNextBuffer());
+               assertNoNextBuffer(view);
                assertEquals(0, subpartition.getBuffersInBacklog());
 
                // some tests with events
 
-               // fill with: buffer, event , and buffer
+               // fill with: buffer, event, and buffer
                subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
                assertFalse(view.nextBufferIsEvent());
                subpartition.add(createEventBufferConsumer(BUFFER_SIZE));
@@ -318,32 +305,24 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                assertEquals(2 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
                verify(listener, times(4)).notifyDataAvailable();
 
-               assertFalse(view.nextBufferIsEvent()); // the first buffer
-               read = view.getNextBuffer();
-               assertNotNull(read);
-               assertTrue(read.buffer().isBuffer());
+               // the first buffer
+               assertNextBuffer(view, BUFFER_SIZE, true, 
subpartition.getBuffersInBacklog() - 1, true, true);
                assertEquals(3 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
                assertEquals(1, subpartition.getBuffersInBacklog());
-               assertEquals(subpartition.getBuffersInBacklog(), 
read.buffersInBacklog());
-               assertTrue(read.nextBufferIsEvent());
 
-               assertTrue(view.nextBufferIsEvent()); // the event
-               read = view.getNextBuffer();
-               assertNotNull(read);
-               assertFalse(read.buffer().isBuffer());
+               // the event
+               assertNextEvent(view, BUFFER_SIZE, null, true, 
subpartition.getBuffersInBacklog(), false, true);
                assertEquals(4 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
                assertEquals(1, subpartition.getBuffersInBacklog());
-               assertEquals(subpartition.getBuffersInBacklog(), 
read.buffersInBacklog());
-               assertFalse(read.nextBufferIsEvent());
 
-               assertFalse(view.nextBufferIsEvent()); // the remaining buffer
-               read = view.getNextBuffer();
-               assertNotNull(read);
-               assertTrue(read.buffer().isBuffer());
+               // the remaining buffer
+               assertNextBuffer(view, BUFFER_SIZE, false, 
subpartition.getBuffersInBacklog() - 1, false, true);
                assertEquals(5 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
                assertEquals(0, subpartition.getBuffersInBacklog());
-               assertEquals(subpartition.getBuffersInBacklog(), 
read.buffersInBacklog());
-               assertFalse(read.nextBufferIsEvent());
+
+               // nothing more
+               assertNoNextBuffer(view);
+               assertEquals(0, subpartition.getBuffersInBacklog());
 
                assertEquals(5, subpartition.getTotalNumberOfBuffers());
                assertEquals(5 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes());

Reply via email to