[FLINK-5277] [tests] Add unit tests for ResultPartition#add() in case of failures
This verifies that the given network buffer is recycled as expected and that no notifiers are called upon failures to add a buffer. This closes #3309 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1ceb7d82 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1ceb7d82 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1ceb7d82 Branch: refs/heads/master Commit: 1ceb7d82eccf4dc77482bddb61a664fd7f226b2b Parents: 5e32eb5 Author: Nico Kruber <n...@data-artisans.com> Authored: Tue Feb 14 17:42:28 2017 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Mon Feb 20 01:01:23 2017 +0100 ---------------------------------------------------------------------- .../network/partition/ResultPartitionTest.java | 75 ++++++++++++++++++++ 1 file changed, 75 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1ceb7d82/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java index f6562a1..0cd3591 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java @@ -20,13 +20,16 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.util.TestBufferFactory; import org.apache.flink.runtime.taskmanager.TaskActions; +import org.junit.Assert; import org.junit.Test; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -70,6 +73,78 @@ public class ResultPartitionTest { } } + @Test + public void testAddOnFinishedPipelinedPartition() throws Exception { + testAddOnFinishedPartition(ResultPartitionType.PIPELINED); + } + + @Test + public void testAddOnFinishedBlockingPartition() throws Exception { + testAddOnFinishedPartition(ResultPartitionType.BLOCKING); + } + + /** + * Tests {@link ResultPartition#add} on a partition which has already finished. + * + * @param pipelined the result partition type to set up + */ + protected void testAddOnFinishedPartition(final ResultPartitionType pipelined) + throws Exception { + Buffer buffer = TestBufferFactory.createBuffer(); + ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); + try { + ResultPartition partition = createPartition(notifier, pipelined, true); + partition.finish(); + reset(notifier); + // partition.add() should fail + partition.add(buffer, 0); + Assert.fail("exception expected"); + } catch (IllegalStateException e) { + // expected => ignored + } finally { + if (!buffer.isRecycled()) { + Assert.fail("buffer not recycled"); + buffer.recycle(); + } + // should not have notified either + verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class)); + } + } + + @Test + public void testAddOnReleasedPipelinedPartition() throws Exception { + testAddOnReleasedPartition(ResultPartitionType.PIPELINED); + } + + @Test + public void testAddOnReleasedBlockingPartition() throws Exception { + testAddOnReleasedPartition(ResultPartitionType.BLOCKING); + } + + /** + * Tests {@link ResultPartition#add} on a partition which has already been released. + * + * @param pipelined the result partition type to set up + */ + protected void testAddOnReleasedPartition(final ResultPartitionType pipelined) + throws Exception { + Buffer buffer = TestBufferFactory.createBuffer(); + ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); + try { + ResultPartition partition = createPartition(notifier, pipelined, true); + partition.release(); + // partition.add() silently drops the buffer but recycles it + partition.add(buffer, 0); + } finally { + if (!buffer.isRecycled()) { + Assert.fail("buffer not recycled"); + buffer.recycle(); + } + // should not have notified either + verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class)); + } + } + // ------------------------------------------------------------------------ private static ResultPartition createPartition(