[FLINK-8755] [FLINK-8786] [network] Add and improve subpartition tests + also improve the subpartition tests in general to reduce some duplication
This closes #5581 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d1a969f7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d1a969f7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d1a969f7 Branch: refs/heads/release-1.5 Commit: d1a969f7ad018ef44f40f974eb49ba004494fcdf Parents: 835adcc Author: Nico Kruber <n...@data-artisans.com> Authored: Fri Feb 23 12:13:20 2018 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Mar 9 17:01:54 2018 +0100 ---------------------------------------------------------------------- .../partition/SpillableSubpartitionView.java | 2 +- .../partition/PipelinedSubpartitionTest.java | 11 +- .../partition/SpillableSubpartitionTest.java | 130 ++++++------------- .../network/partition/SubpartitionTestBase.java | 78 ++++++++++- 4 files changed, 121 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d1a969f7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java index 0f51bc8..65790d7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java @@ -167,7 +167,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView { parent.updateStatistics(current); // if we are spilled (but still process a non-spilled nextBuffer), we don't know the - // state of nextBufferIsEvent... + // state of nextBufferIsEvent or whether more buffers are available if (spilledView == null) { return new BufferAndBacklog(current, isMoreAvailable, newBacklog, nextBufferIsEvent); } http://git-wip-us.apache.org/repos/asf/flink/blob/d1a969f7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java index ee678ab..bc66c9d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java @@ -135,7 +135,8 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { bufferBuilder.appendAndCommit(ByteBuffer.allocate(1024)); subpartition.add(bufferBuilder.createBufferConsumer()); - assertNextBuffer(readView, 1024, false, 1); + // note that since the buffer builder is not finished, there is still a retained instance! + assertNextBuffer(readView, 1024, false, 1, false, false); assertEquals(1, subpartition.getBuffersInBacklog()); } finally { readView.releaseAllResources(); @@ -157,7 +158,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { subpartition.add(createFilledBufferConsumer(1025)); // finished subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished - assertNextBuffer(readView, 1025, false, 1); + assertNextBuffer(readView, 1025, false, 1, false, true); } finally { subpartition.release(); } @@ -178,8 +179,8 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished subpartition.flush(); - assertNextBuffer(readView, 1025, true, 1); - assertNextBuffer(readView, 1024, false, 1); + assertNextBuffer(readView, 1025, true, 1, false, true); + assertNextBuffer(readView, 1024, false, 1, false, false); } finally { subpartition.release(); } @@ -208,7 +209,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { subpartition.add(createFilledBufferConsumer(1024)); assertEquals(2, availablityListener.getNumNotifications()); - assertNextBuffer(readView, 1024, false, 0); + assertNextBuffer(readView, 1024, false, 0, false, true); } finally { readView.releaseAllResources(); subpartition.release(); http://git-wip-us.apache.org/repos/asf/flink/blob/d1a969f7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java index e41a85c..840669e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java @@ -24,13 +24,13 @@ import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsyncWithNoOpBufferFileWriter; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; +import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; -import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; -import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.junit.AfterClass; import org.junit.Assert; @@ -52,7 +52,6 @@ import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils. import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -190,10 +189,13 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { SpillableSubpartition partition = createSubpartition(); BufferConsumer bufferConsumer = createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE); + BufferConsumer eventBufferConsumer = + EventSerializer.toBufferConsumer(new CancelCheckpointMarker(1)); + final int eventSize = eventBufferConsumer.getWrittenBytes(); partition.add(bufferConsumer.copy()); partition.add(bufferConsumer.copy()); - partition.add(BufferBuilderTestUtils.createEventBufferConsumer(BUFFER_DATA_SIZE)); + partition.add(eventBufferConsumer); partition.add(bufferConsumer); assertEquals(4, partition.getTotalNumberOfBuffers()); @@ -207,13 +209,13 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { // still same statistics assertEquals(4, partition.getTotalNumberOfBuffers()); assertEquals(3, partition.getBuffersInBacklog()); - assertEquals(BUFFER_DATA_SIZE * 4, partition.getTotalNumberOfBytes()); + assertEquals(BUFFER_DATA_SIZE * 3 + eventSize, partition.getTotalNumberOfBytes()); partition.finish(); // + one EndOfPartitionEvent assertEquals(5, partition.getTotalNumberOfBuffers()); assertEquals(3, partition.getBuffersInBacklog()); - assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); + assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener(); SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener); @@ -221,59 +223,24 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { assertEquals(1, listener.getNumNotifications()); assertFalse(reader.nextBufferIsEvent()); // buffer - BufferAndBacklog read = reader.getNextBuffer(); - assertNotNull(read); - assertTrue(read.buffer().isBuffer()); + assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, true); assertEquals(2, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - assertFalse(read.buffer().isRecycled()); - read.buffer().recycleBuffer(); - assertTrue(read.buffer().isRecycled()); - assertFalse(read.nextBufferIsEvent()); assertFalse(reader.nextBufferIsEvent()); // buffer - read = reader.getNextBuffer(); - assertNotNull(read); - assertTrue(read.buffer().isBuffer()); + assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true); assertEquals(1, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - assertFalse(read.buffer().isRecycled()); - read.buffer().recycleBuffer(); - assertTrue(read.buffer().isRecycled()); - assertTrue(read.nextBufferIsEvent()); assertTrue(reader.nextBufferIsEvent()); // event - read = reader.getNextBuffer(); - assertNotNull(read); - assertFalse(read.buffer().isBuffer()); + assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true); assertEquals(1, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - read.buffer().recycleBuffer(); - assertFalse(read.nextBufferIsEvent()); assertFalse(reader.nextBufferIsEvent()); // buffer - read = reader.getNextBuffer(); - assertNotNull(read); - assertTrue(read.buffer().isBuffer()); + assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true); assertEquals(0, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - assertFalse(read.buffer().isRecycled()); - read.buffer().recycleBuffer(); - assertTrue(read.buffer().isRecycled()); - assertTrue(read.nextBufferIsEvent()); assertTrue(reader.nextBufferIsEvent()); // end of partition event - read = reader.getNextBuffer(); - assertNotNull(read); - assertFalse(read.buffer().isBuffer()); + assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true); assertEquals(0, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - assertEquals(EndOfPartitionEvent.class, - EventSerializer.fromBuffer(read.buffer(), ClassLoader.getSystemClassLoader()).getClass()); - assertFalse(read.buffer().isRecycled()); - read.buffer().recycleBuffer(); - assertTrue(read.buffer().isRecycled()); - assertFalse(read.nextBufferIsEvent()); // finally check that the bufferConsumer has been freed after a successful (or failed) write final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs @@ -292,10 +259,13 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { SpillableSubpartition partition = createSubpartition(); BufferConsumer bufferConsumer = createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE); + BufferConsumer eventBufferConsumer = + EventSerializer.toBufferConsumer(new CancelCheckpointMarker(1)); + final int eventSize = eventBufferConsumer.getWrittenBytes(); partition.add(bufferConsumer.copy()); partition.add(bufferConsumer.copy()); - partition.add(BufferBuilderTestUtils.createEventBufferConsumer(BUFFER_DATA_SIZE)); + partition.add(eventBufferConsumer); partition.add(bufferConsumer); partition.finish(); @@ -311,17 +281,12 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { assertFalse(bufferConsumer.isRecycled()); assertFalse(reader.nextBufferIsEvent()); - BufferAndBacklog read = reader.getNextBuffer(); // first buffer (non-spilled) - assertNotNull(read); - assertTrue(read.buffer().isBuffer()); + // first buffer (non-spilled) + assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, false); assertEquals(BUFFER_DATA_SIZE, partition.getTotalNumberOfBytes()); // only updated when getting/spilling the buffers assertEquals(2, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - read.buffer().recycleBuffer(); - assertTrue(read.isMoreAvailable()); assertEquals(1, listener.getNumNotifications()); // since isMoreAvailable is set to true, no need for notification assertFalse(bufferConsumer.isRecycled()); - assertFalse(read.nextBufferIsEvent()); // Spill now assertEquals(3, partition.releaseMemory()); @@ -330,59 +295,44 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { assertEquals(5, partition.getTotalNumberOfBuffers()); assertEquals(2, partition.getBuffersInBacklog()); // only updated when getting/spilling the buffers but without the nextBuffer (kept in memory) - assertEquals(BUFFER_DATA_SIZE * 3 + 4, partition.getTotalNumberOfBytes()); + assertEquals(BUFFER_DATA_SIZE * 2 + eventSize + 4, partition.getTotalNumberOfBytes()); + // wait for successfully spilling all buffers (before that we may not access any spilled buffer and cannot rely on isMoreAvailable!) listener.awaitNotifications(2, 30_000); // Spiller finished assertEquals(2, listener.getNumNotifications()); + // after consuming and releasing the next buffer, the bufferConsumer may be freed, + // depending on the timing of the last write operation + // -> retain once so that we can check below + Buffer buffer = bufferConsumer.build(); + buffer.retainBuffer(); + assertFalse(reader.nextBufferIsEvent()); // second buffer (retained in SpillableSubpartition#nextBuffer) - read = reader.getNextBuffer(); - assertNotNull(read); - assertTrue(read.buffer().isBuffer()); - assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // finally integrates the nextBuffer statistics + assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, false); + assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // finally integrates the nextBuffer statistics assertEquals(1, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - read.buffer().recycleBuffer(); - // now the bufferConsumer may be freed, depending on the timing of the write operation - // -> let's do this check at the end of the test (to save some time) - assertTrue(read.nextBufferIsEvent()); + + bufferConsumer.close(); // recycle the retained buffer from above (should be the last reference!) assertTrue(reader.nextBufferIsEvent()); // the event (spilled) - read = reader.getNextBuffer(); - assertNotNull(read); - assertFalse(read.buffer().isBuffer()); - assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // already updated during spilling + assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true); + assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling assertEquals(1, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - read.buffer().recycleBuffer(); - assertFalse(read.nextBufferIsEvent()); assertFalse(reader.nextBufferIsEvent()); // last buffer (spilled) - read = reader.getNextBuffer(); - assertNotNull(read); - assertTrue(read.buffer().isBuffer()); - assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // already updated during spilling + assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true); + assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling assertEquals(0, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - assertFalse(read.buffer().isRecycled()); - read.buffer().recycleBuffer(); - assertTrue(read.buffer().isRecycled()); - assertTrue(read.nextBufferIsEvent()); + + buffer.recycleBuffer(); + assertTrue(buffer.isRecycled()); // End of partition assertTrue(reader.nextBufferIsEvent()); - read = reader.getNextBuffer(); - assertNotNull(read); - assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // already updated during spilling + assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true); + assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling assertEquals(0, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - assertEquals(EndOfPartitionEvent.class, - EventSerializer.fromBuffer(read.buffer(), ClassLoader.getSystemClassLoader()).getClass()); - assertFalse(read.buffer().isRecycled()); - read.buffer().recycleBuffer(); - assertTrue(read.buffer().isRecycled()); - assertFalse(read.nextBufferIsEvent()); // finally check that the bufferConsumer has been freed after a successful (or failed) write final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs http://git-wip-us.apache.org/repos/asf/flink/blob/d1a969f7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java index a3f18f6..8c90215 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java @@ -18,19 +18,26 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.runtime.event.AbstractEvent; +import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.util.TestLogger; import org.junit.Test; +import javax.annotation.Nullable; + import java.io.IOException; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -138,11 +145,74 @@ public abstract class SubpartitionTestBase extends TestLogger { ResultSubpartitionView readView, int expectedReadableBufferSize, boolean expectedIsMoreAvailable, - int expectedBuffersInBacklog) throws IOException, InterruptedException { + int expectedBuffersInBacklog, + boolean expectedNextBufferIsEvent, + boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException { + assertNextBufferOrEvent( + readView, + expectedReadableBufferSize, + true, + null, + expectedIsMoreAvailable, + expectedBuffersInBacklog, + expectedNextBufferIsEvent, + expectedRecycledAfterRecycle); + } + + static void assertNextEvent( + ResultSubpartitionView readView, + int expectedReadableBufferSize, + Class<? extends AbstractEvent> expectedEventClass, + boolean expectedIsMoreAvailable, + int expectedBuffersInBacklog, + boolean expectedNextBufferIsEvent, + boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException { + assertNextBufferOrEvent( + readView, + expectedReadableBufferSize, + false, + expectedEventClass, + expectedIsMoreAvailable, + expectedBuffersInBacklog, + expectedNextBufferIsEvent, + expectedRecycledAfterRecycle); + } + + private static void assertNextBufferOrEvent( + ResultSubpartitionView readView, + int expectedReadableBufferSize, + boolean expectedIsBuffer, + @Nullable Class<? extends AbstractEvent> expectedEventClass, + boolean expectedIsMoreAvailable, + int expectedBuffersInBacklog, + boolean expectedNextBufferIsEvent, + boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException { + checkArgument(expectedEventClass == null || !expectedIsBuffer); + ResultSubpartition.BufferAndBacklog bufferAndBacklog = readView.getNextBuffer(); - assertEquals(expectedReadableBufferSize, bufferAndBacklog.buffer().readableBytes()); - assertEquals(expectedIsMoreAvailable, bufferAndBacklog.isMoreAvailable()); - assertEquals(expectedBuffersInBacklog, bufferAndBacklog.buffersInBacklog()); + assertNotNull(bufferAndBacklog); + try { + assertEquals("buffer size", expectedReadableBufferSize, + bufferAndBacklog.buffer().readableBytes()); + assertEquals("buffer or event", expectedIsBuffer, + bufferAndBacklog.buffer().isBuffer()); + if (expectedEventClass != null) { + assertThat(EventSerializer + .fromBuffer(bufferAndBacklog.buffer(), ClassLoader.getSystemClassLoader()), + instanceOf(expectedEventClass)); + } + assertEquals("more available", expectedIsMoreAvailable, + bufferAndBacklog.isMoreAvailable()); + assertEquals("more available", expectedIsMoreAvailable, readView.isAvailable()); + assertEquals("backlog", expectedBuffersInBacklog, bufferAndBacklog.buffersInBacklog()); + assertEquals("next is event", expectedNextBufferIsEvent, + bufferAndBacklog.nextBufferIsEvent()); + + assertFalse("not recycled", bufferAndBacklog.buffer().isRecycled()); + } finally { + bufferAndBacklog.buffer().recycleBuffer(); + } + assertEquals("recycled", expectedRecycledAfterRecycle, bufferAndBacklog.buffer().isRecycled()); } protected void assertNoNextBuffer(ResultSubpartitionView readView) throws IOException, InterruptedException {