This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch release-1.5 in repository https://gitbox.apache.org/repos/asf/flink.git
commit fb18c28388b3613a19e204f88e4c46d4c0add569 Author: Nico Kruber <[email protected]> AuthorDate: Thu Sep 13 18:47:25 2018 +0200 [hotfix][network][tests] use assertNextBuffer etc in PipelinedSubpartitionTest --- .../partition/PipelinedSubpartitionTest.java | 53 +++++++--------------- 1 file changed, 16 insertions(+), 37 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java index fc9a643..90bdb82 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java @@ -25,7 +25,6 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; -import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.runtime.io.network.util.TestConsumerCallback; import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; import org.apache.flink.runtime.io.network.util.TestProducerSource; @@ -254,7 +253,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { // Empty => should return null assertFalse(view.nextBufferIsEvent()); - assertNull(view.getNextBuffer()); + assertNoNextBuffer(view); assertFalse(view.nextBufferIsEvent()); // also after getNextBuffer() verify(listener, times(0)).notifyDataAvailable(); @@ -270,16 +269,10 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { verify(listener, times(1)).notifyDataAvailable(); // ...and one available result - assertFalse(view.nextBufferIsEvent()); - BufferAndBacklog read = view.getNextBuffer(); - assertNotNull(read); - assertTrue(read.buffer().isBuffer()); + assertNextBuffer(view, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true); assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer assertEquals(0, subpartition.getBuffersInBacklog()); - assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog()); - assertFalse(read.nextBufferIsEvent()); - assertFalse(view.nextBufferIsEvent()); - assertNull(view.getNextBuffer()); + assertNoNextBuffer(view); assertEquals(0, subpartition.getBuffersInBacklog()); // Add data to the queue... @@ -291,21 +284,15 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer verify(listener, times(2)).notifyDataAvailable(); - assertFalse(view.nextBufferIsEvent()); - read = view.getNextBuffer(); - assertNotNull(read); - assertTrue(read.buffer().isBuffer()); + assertNextBuffer(view, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true); assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer assertEquals(0, subpartition.getBuffersInBacklog()); - assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog()); - assertFalse(read.nextBufferIsEvent()); - assertFalse(view.nextBufferIsEvent()); - assertNull(view.getNextBuffer()); + assertNoNextBuffer(view); assertEquals(0, subpartition.getBuffersInBacklog()); // some tests with events - // fill with: buffer, event , and buffer + // fill with: buffer, event, and buffer subpartition.add(createFilledBufferConsumer(BUFFER_SIZE)); assertFalse(view.nextBufferIsEvent()); subpartition.add(createEventBufferConsumer(BUFFER_SIZE)); @@ -318,32 +305,24 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer verify(listener, times(4)).notifyDataAvailable(); - assertFalse(view.nextBufferIsEvent()); // the first buffer - read = view.getNextBuffer(); - assertNotNull(read); - assertTrue(read.buffer().isBuffer()); + // the first buffer + assertNextBuffer(view, BUFFER_SIZE, true, subpartition.getBuffersInBacklog() - 1, true, true); assertEquals(3 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer assertEquals(1, subpartition.getBuffersInBacklog()); - assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog()); - assertTrue(read.nextBufferIsEvent()); - assertTrue(view.nextBufferIsEvent()); // the event - read = view.getNextBuffer(); - assertNotNull(read); - assertFalse(read.buffer().isBuffer()); + // the event + assertNextEvent(view, BUFFER_SIZE, null, true, subpartition.getBuffersInBacklog(), false, true); assertEquals(4 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer assertEquals(1, subpartition.getBuffersInBacklog()); - assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog()); - assertFalse(read.nextBufferIsEvent()); - assertFalse(view.nextBufferIsEvent()); // the remaining buffer - read = view.getNextBuffer(); - assertNotNull(read); - assertTrue(read.buffer().isBuffer()); + // the remaining buffer + assertNextBuffer(view, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true); assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer assertEquals(0, subpartition.getBuffersInBacklog()); - assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog()); - assertFalse(read.nextBufferIsEvent()); + + // nothing more + assertNoNextBuffer(view); + assertEquals(0, subpartition.getBuffersInBacklog()); assertEquals(5, subpartition.getTotalNumberOfBuffers()); assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
