[FLINK-8755] [FLINK-8786] [network] Add and improve subpartition tests

+ also improve the subpartition tests in general to reduce some duplication

This closes #5581


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d1a969f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d1a969f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d1a969f7

Branch: refs/heads/release-1.5
Commit: d1a969f7ad018ef44f40f974eb49ba004494fcdf
Parents: 835adcc
Author: Nico Kruber <n...@data-artisans.com>
Authored: Fri Feb 23 12:13:20 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 9 17:01:54 2018 +0100

----------------------------------------------------------------------
 .../partition/SpillableSubpartitionView.java    |   2 +-
 .../partition/PipelinedSubpartitionTest.java    |  11 +-
 .../partition/SpillableSubpartitionTest.java    | 130 ++++++-------------
 .../network/partition/SubpartitionTestBase.java |  78 ++++++++++-
 4 files changed, 121 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d1a969f7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index 0f51bc8..65790d7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -167,7 +167,7 @@ class SpillableSubpartitionView implements 
ResultSubpartitionView {
 
                                parent.updateStatistics(current);
                                // if we are spilled (but still process a 
non-spilled nextBuffer), we don't know the
-                               // state of nextBufferIsEvent...
+                               // state of nextBufferIsEvent or whether more 
buffers are available
                                if (spilledView == null) {
                                        return new BufferAndBacklog(current, 
isMoreAvailable, newBacklog, nextBufferIsEvent);
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/d1a969f7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index ee678ab..bc66c9d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -135,7 +135,8 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                        
bufferBuilder.appendAndCommit(ByteBuffer.allocate(1024));
                        subpartition.add(bufferBuilder.createBufferConsumer());
 
-                       assertNextBuffer(readView, 1024, false, 1);
+                       // note that since the buffer builder is not finished, 
there is still a retained instance!
+                       assertNextBuffer(readView, 1024, false, 1, false, 
false);
                        assertEquals(1, subpartition.getBuffersInBacklog());
                } finally {
                        readView.releaseAllResources();
@@ -157,7 +158,7 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                        subpartition.add(createFilledBufferConsumer(1025)); // 
finished
                        
subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // 
not finished
 
-                       assertNextBuffer(readView, 1025, false, 1);
+                       assertNextBuffer(readView, 1025, false, 1, false, true);
                } finally {
                        subpartition.release();
                }
@@ -178,8 +179,8 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                        
subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // 
not finished
                        subpartition.flush();
 
-                       assertNextBuffer(readView, 1025, true, 1);
-                       assertNextBuffer(readView, 1024, false, 1);
+                       assertNextBuffer(readView, 1025, true, 1, false, true);
+                       assertNextBuffer(readView, 1024, false, 1, false, 
false);
                } finally {
                        subpartition.release();
                }
@@ -208,7 +209,7 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                        subpartition.add(createFilledBufferConsumer(1024));
                        assertEquals(2, 
availablityListener.getNumNotifications());
 
-                       assertNextBuffer(readView, 1024, false, 0);
+                       assertNextBuffer(readView, 1024, false, 0, false, true);
                } finally {
                        readView.releaseAllResources();
                        subpartition.release();

http://git-wip-us.apache.org/repos/asf/flink/blob/d1a969f7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index e41a85c..840669e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -24,13 +24,13 @@ import 
org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsyncWithNoOpBufferFileWriter;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -52,7 +52,6 @@ import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
@@ -190,10 +189,13 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                SpillableSubpartition partition = createSubpartition();
 
                BufferConsumer bufferConsumer = 
createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
+               BufferConsumer eventBufferConsumer =
+                       EventSerializer.toBufferConsumer(new 
CancelCheckpointMarker(1));
+               final int eventSize = eventBufferConsumer.getWrittenBytes();
 
                partition.add(bufferConsumer.copy());
                partition.add(bufferConsumer.copy());
-               
partition.add(BufferBuilderTestUtils.createEventBufferConsumer(BUFFER_DATA_SIZE));
+               partition.add(eventBufferConsumer);
                partition.add(bufferConsumer);
 
                assertEquals(4, partition.getTotalNumberOfBuffers());
@@ -207,13 +209,13 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                // still same statistics
                assertEquals(4, partition.getTotalNumberOfBuffers());
                assertEquals(3, partition.getBuffersInBacklog());
-               assertEquals(BUFFER_DATA_SIZE * 4, 
partition.getTotalNumberOfBytes());
+               assertEquals(BUFFER_DATA_SIZE * 3 + eventSize, 
partition.getTotalNumberOfBytes());
 
                partition.finish();
                // + one EndOfPartitionEvent
                assertEquals(5, partition.getTotalNumberOfBuffers());
                assertEquals(3, partition.getBuffersInBacklog());
-               assertEquals(BUFFER_DATA_SIZE * 4 + 4, 
partition.getTotalNumberOfBytes());
+               assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, 
partition.getTotalNumberOfBytes());
 
                AwaitableBufferAvailablityListener listener = new 
AwaitableBufferAvailablityListener();
                SpilledSubpartitionView reader = (SpilledSubpartitionView) 
partition.createReadView(listener);
@@ -221,59 +223,24 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                assertEquals(1, listener.getNumNotifications());
 
                assertFalse(reader.nextBufferIsEvent()); // buffer
-               BufferAndBacklog read = reader.getNextBuffer();
-               assertNotNull(read);
-               assertTrue(read.buffer().isBuffer());
+               assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, 
true);
                assertEquals(2, partition.getBuffersInBacklog());
-               assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
-               assertFalse(read.buffer().isRecycled());
-               read.buffer().recycleBuffer();
-               assertTrue(read.buffer().isRecycled());
-               assertFalse(read.nextBufferIsEvent());
 
                assertFalse(reader.nextBufferIsEvent()); // buffer
-               read = reader.getNextBuffer();
-               assertNotNull(read);
-               assertTrue(read.buffer().isBuffer());
+               assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true);
                assertEquals(1, partition.getBuffersInBacklog());
-               assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
-               assertFalse(read.buffer().isRecycled());
-               read.buffer().recycleBuffer();
-               assertTrue(read.buffer().isRecycled());
-               assertTrue(read.nextBufferIsEvent());
 
                assertTrue(reader.nextBufferIsEvent()); // event
-               read = reader.getNextBuffer();
-               assertNotNull(read);
-               assertFalse(read.buffer().isBuffer());
+               assertNextEvent(reader, eventSize, 
CancelCheckpointMarker.class, true, 1, false, true);
                assertEquals(1, partition.getBuffersInBacklog());
-               assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
-               read.buffer().recycleBuffer();
-               assertFalse(read.nextBufferIsEvent());
 
                assertFalse(reader.nextBufferIsEvent()); // buffer
-               read = reader.getNextBuffer();
-               assertNotNull(read);
-               assertTrue(read.buffer().isBuffer());
+               assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true);
                assertEquals(0, partition.getBuffersInBacklog());
-               assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
-               assertFalse(read.buffer().isRecycled());
-               read.buffer().recycleBuffer();
-               assertTrue(read.buffer().isRecycled());
-               assertTrue(read.nextBufferIsEvent());
 
                assertTrue(reader.nextBufferIsEvent()); // end of partition 
event
-               read = reader.getNextBuffer();
-               assertNotNull(read);
-               assertFalse(read.buffer().isBuffer());
+               assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, 
false, true);
                assertEquals(0, partition.getBuffersInBacklog());
-               assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
-               assertEquals(EndOfPartitionEvent.class,
-                       EventSerializer.fromBuffer(read.buffer(), 
ClassLoader.getSystemClassLoader()).getClass());
-               assertFalse(read.buffer().isRecycled());
-               read.buffer().recycleBuffer();
-               assertTrue(read.buffer().isRecycled());
-               assertFalse(read.nextBufferIsEvent());
 
                // finally check that the bufferConsumer has been freed after a 
successful (or failed) write
                final long deadline = System.currentTimeMillis() + 30_000L; // 
30 secs
@@ -292,10 +259,13 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                SpillableSubpartition partition = createSubpartition();
 
                BufferConsumer bufferConsumer = 
createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
+               BufferConsumer eventBufferConsumer =
+                       EventSerializer.toBufferConsumer(new 
CancelCheckpointMarker(1));
+               final int eventSize = eventBufferConsumer.getWrittenBytes();
 
                partition.add(bufferConsumer.copy());
                partition.add(bufferConsumer.copy());
-               
partition.add(BufferBuilderTestUtils.createEventBufferConsumer(BUFFER_DATA_SIZE));
+               partition.add(eventBufferConsumer);
                partition.add(bufferConsumer);
                partition.finish();
 
@@ -311,17 +281,12 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                assertFalse(bufferConsumer.isRecycled());
 
                assertFalse(reader.nextBufferIsEvent());
-               BufferAndBacklog read = reader.getNextBuffer(); // first buffer 
(non-spilled)
-               assertNotNull(read);
-               assertTrue(read.buffer().isBuffer());
+               // first buffer (non-spilled)
+               assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, 
false);
                assertEquals(BUFFER_DATA_SIZE, 
partition.getTotalNumberOfBytes()); // only updated when getting/spilling the 
buffers
                assertEquals(2, partition.getBuffersInBacklog());
-               assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
-               read.buffer().recycleBuffer();
-               assertTrue(read.isMoreAvailable());
                assertEquals(1, listener.getNumNotifications()); // since 
isMoreAvailable is set to true, no need for notification
                assertFalse(bufferConsumer.isRecycled());
-               assertFalse(read.nextBufferIsEvent());
 
                // Spill now
                assertEquals(3, partition.releaseMemory());
@@ -330,59 +295,44 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                assertEquals(5, partition.getTotalNumberOfBuffers());
                assertEquals(2, partition.getBuffersInBacklog());
                // only updated when getting/spilling the buffers but without 
the nextBuffer (kept in memory)
-               assertEquals(BUFFER_DATA_SIZE * 3 + 4, 
partition.getTotalNumberOfBytes());
+               assertEquals(BUFFER_DATA_SIZE * 2 + eventSize + 4, 
partition.getTotalNumberOfBytes());
 
+               // wait for successfully spilling all buffers (before that we 
may not access any spilled buffer and cannot rely on isMoreAvailable!)
                listener.awaitNotifications(2, 30_000);
                // Spiller finished
                assertEquals(2, listener.getNumNotifications());
 
+               // after consuming and releasing the next buffer, the 
bufferConsumer may be freed,
+               // depending on the timing of the last write operation
+               // -> retain once so that we can check below
+               Buffer buffer = bufferConsumer.build();
+               buffer.retainBuffer();
+
                assertFalse(reader.nextBufferIsEvent()); // second buffer 
(retained in SpillableSubpartition#nextBuffer)
-               read = reader.getNextBuffer();
-               assertNotNull(read);
-               assertTrue(read.buffer().isBuffer());
-               assertEquals(BUFFER_DATA_SIZE * 4 + 4, 
partition.getTotalNumberOfBytes()); // finally integrates the nextBuffer 
statistics
+               assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, 
false);
+               assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, 
partition.getTotalNumberOfBytes()); // finally integrates the nextBuffer 
statistics
                assertEquals(1, partition.getBuffersInBacklog());
-               assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
-               read.buffer().recycleBuffer();
-               // now the bufferConsumer may be freed, depending on the timing 
of the write operation
-               // -> let's do this check at the end of the test (to save some 
time)
-               assertTrue(read.nextBufferIsEvent());
+
+               bufferConsumer.close(); // recycle the retained buffer from 
above (should be the last reference!)
 
                assertTrue(reader.nextBufferIsEvent()); // the event (spilled)
-               read = reader.getNextBuffer();
-               assertNotNull(read);
-               assertFalse(read.buffer().isBuffer());
-               assertEquals(BUFFER_DATA_SIZE * 4 + 4, 
partition.getTotalNumberOfBytes()); // already updated during spilling
+               assertNextEvent(reader, eventSize, 
CancelCheckpointMarker.class, true, 1, false, true);
+               assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, 
partition.getTotalNumberOfBytes()); // already updated during spilling
                assertEquals(1, partition.getBuffersInBacklog());
-               assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
-               read.buffer().recycleBuffer();
-               assertFalse(read.nextBufferIsEvent());
 
                assertFalse(reader.nextBufferIsEvent()); // last buffer 
(spilled)
-               read = reader.getNextBuffer();
-               assertNotNull(read);
-               assertTrue(read.buffer().isBuffer());
-               assertEquals(BUFFER_DATA_SIZE * 4 + 4, 
partition.getTotalNumberOfBytes()); // already updated during spilling
+               assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true);
+               assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, 
partition.getTotalNumberOfBytes()); // already updated during spilling
                assertEquals(0, partition.getBuffersInBacklog());
-               assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
-               assertFalse(read.buffer().isRecycled());
-               read.buffer().recycleBuffer();
-               assertTrue(read.buffer().isRecycled());
-               assertTrue(read.nextBufferIsEvent());
+
+               buffer.recycleBuffer();
+               assertTrue(buffer.isRecycled());
 
                // End of partition
                assertTrue(reader.nextBufferIsEvent());
-               read = reader.getNextBuffer();
-               assertNotNull(read);
-               assertEquals(BUFFER_DATA_SIZE * 4 + 4, 
partition.getTotalNumberOfBytes()); // already updated during spilling
+               assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, 
false, true);
+               assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, 
partition.getTotalNumberOfBytes()); // already updated during spilling
                assertEquals(0, partition.getBuffersInBacklog());
-               assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
-               assertEquals(EndOfPartitionEvent.class,
-                       EventSerializer.fromBuffer(read.buffer(), 
ClassLoader.getSystemClassLoader()).getClass());
-               assertFalse(read.buffer().isRecycled());
-               read.buffer().recycleBuffer();
-               assertTrue(read.buffer().isRecycled());
-               assertFalse(read.nextBufferIsEvent());
 
                // finally check that the bufferConsumer has been freed after a 
successful (or failed) write
                final long deadline = System.currentTimeMillis() + 30_000L; // 
30 secs

http://git-wip-us.apache.org/repos/asf/flink/blob/d1a969f7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index a3f18f6..8c90215 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -18,19 +18,26 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
@@ -138,11 +145,74 @@ public abstract class SubpartitionTestBase extends 
TestLogger {
                        ResultSubpartitionView readView,
                        int expectedReadableBufferSize,
                        boolean expectedIsMoreAvailable,
-                       int expectedBuffersInBacklog) throws IOException, 
InterruptedException {
+                       int expectedBuffersInBacklog,
+                       boolean expectedNextBufferIsEvent,
+                       boolean expectedRecycledAfterRecycle) throws 
IOException, InterruptedException {
+               assertNextBufferOrEvent(
+                       readView,
+                       expectedReadableBufferSize,
+                       true,
+                       null,
+                       expectedIsMoreAvailable,
+                       expectedBuffersInBacklog,
+                       expectedNextBufferIsEvent,
+                       expectedRecycledAfterRecycle);
+       }
+
+       static void assertNextEvent(
+                       ResultSubpartitionView readView,
+                       int expectedReadableBufferSize,
+                       Class<? extends AbstractEvent> expectedEventClass,
+                       boolean expectedIsMoreAvailable,
+                       int expectedBuffersInBacklog,
+                       boolean expectedNextBufferIsEvent,
+                       boolean expectedRecycledAfterRecycle) throws 
IOException, InterruptedException {
+               assertNextBufferOrEvent(
+                       readView,
+                       expectedReadableBufferSize,
+                       false,
+                       expectedEventClass,
+                       expectedIsMoreAvailable,
+                       expectedBuffersInBacklog,
+                       expectedNextBufferIsEvent,
+                       expectedRecycledAfterRecycle);
+       }
+
+       private static void assertNextBufferOrEvent(
+                       ResultSubpartitionView readView,
+                       int expectedReadableBufferSize,
+                       boolean expectedIsBuffer,
+                       @Nullable Class<? extends AbstractEvent> 
expectedEventClass,
+                       boolean expectedIsMoreAvailable,
+                       int expectedBuffersInBacklog,
+                       boolean expectedNextBufferIsEvent,
+                       boolean expectedRecycledAfterRecycle) throws 
IOException, InterruptedException {
+               checkArgument(expectedEventClass == null || !expectedIsBuffer);
+
                ResultSubpartition.BufferAndBacklog bufferAndBacklog = 
readView.getNextBuffer();
-               assertEquals(expectedReadableBufferSize, 
bufferAndBacklog.buffer().readableBytes());
-               assertEquals(expectedIsMoreAvailable, 
bufferAndBacklog.isMoreAvailable());
-               assertEquals(expectedBuffersInBacklog, 
bufferAndBacklog.buffersInBacklog());
+               assertNotNull(bufferAndBacklog);
+               try {
+                       assertEquals("buffer size", expectedReadableBufferSize,
+                               bufferAndBacklog.buffer().readableBytes());
+                       assertEquals("buffer or event", expectedIsBuffer,
+                               bufferAndBacklog.buffer().isBuffer());
+                       if (expectedEventClass != null) {
+                               assertThat(EventSerializer
+                                               
.fromBuffer(bufferAndBacklog.buffer(), ClassLoader.getSystemClassLoader()),
+                                       instanceOf(expectedEventClass));
+                       }
+                       assertEquals("more available", expectedIsMoreAvailable,
+                               bufferAndBacklog.isMoreAvailable());
+                       assertEquals("more available", expectedIsMoreAvailable, 
readView.isAvailable());
+                       assertEquals("backlog", expectedBuffersInBacklog, 
bufferAndBacklog.buffersInBacklog());
+                       assertEquals("next is event", expectedNextBufferIsEvent,
+                               bufferAndBacklog.nextBufferIsEvent());
+
+                       assertFalse("not recycled", 
bufferAndBacklog.buffer().isRecycled());
+               } finally {
+                       bufferAndBacklog.buffer().recycleBuffer();
+               }
+               assertEquals("recycled", expectedRecycledAfterRecycle, 
bufferAndBacklog.buffer().isRecycled());
        }
 
        protected void assertNoNextBuffer(ResultSubpartitionView readView) 
throws IOException, InterruptedException {

Reply via email to