[FLINK-5277] [tests] Add unit tests for ResultPartition#add() in case of 
failures

This verifies that the given network buffer is recycled as expected and that
no notifiers are called upon failures to add a buffer.

This closes #3309


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1ceb7d82
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1ceb7d82
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1ceb7d82

Branch: refs/heads/master
Commit: 1ceb7d82eccf4dc77482bddb61a664fd7f226b2b
Parents: 5e32eb5
Author: Nico Kruber <n...@data-artisans.com>
Authored: Tue Feb 14 17:42:28 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 20 01:01:23 2017 +0100

----------------------------------------------------------------------
 .../network/partition/ResultPartitionTest.java  | 75 ++++++++++++++++++++
 1 file changed, 75 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1ceb7d82/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index f6562a1..0cd3591 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -20,13 +20,16 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.junit.Assert;
 import org.junit.Test;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
@@ -70,6 +73,78 @@ public class ResultPartitionTest {
                }
        }
 
+       @Test
+       public void testAddOnFinishedPipelinedPartition() throws Exception {
+               testAddOnFinishedPartition(ResultPartitionType.PIPELINED);
+       }
+
+       @Test
+       public void testAddOnFinishedBlockingPartition() throws Exception {
+               testAddOnFinishedPartition(ResultPartitionType.BLOCKING);
+       }
+
+       /**
+        * Tests {@link ResultPartition#add} on a partition which has already 
finished.
+        *
+        * @param pipelined the result partition type to set up
+        */
+       protected void testAddOnFinishedPartition(final ResultPartitionType 
pipelined)
+               throws Exception {
+               Buffer buffer = TestBufferFactory.createBuffer();
+               ResultPartitionConsumableNotifier notifier = 
mock(ResultPartitionConsumableNotifier.class);
+               try {
+                       ResultPartition partition = createPartition(notifier, 
pipelined, true);
+                       partition.finish();
+                       reset(notifier);
+                       // partition.add() should fail
+                       partition.add(buffer, 0);
+                       Assert.fail("exception expected");
+               } catch (IllegalStateException e) {
+                       // expected => ignored
+               } finally {
+                       if (!buffer.isRecycled()) {
+                               Assert.fail("buffer not recycled");
+                               buffer.recycle();
+                       }
+                       // should not have notified either
+                       verify(notifier, 
never()).notifyPartitionConsumable(any(JobID.class), 
any(ResultPartitionID.class), any(TaskActions.class));
+               }
+       }
+
+       @Test
+       public void testAddOnReleasedPipelinedPartition() throws Exception {
+               testAddOnReleasedPartition(ResultPartitionType.PIPELINED);
+       }
+
+       @Test
+       public void testAddOnReleasedBlockingPartition() throws Exception {
+               testAddOnReleasedPartition(ResultPartitionType.BLOCKING);
+       }
+
+       /**
+        * Tests {@link ResultPartition#add} on a partition which has already 
been released.
+        *
+        * @param pipelined the result partition type to set up
+        */
+       protected void testAddOnReleasedPartition(final ResultPartitionType 
pipelined)
+               throws Exception {
+               Buffer buffer = TestBufferFactory.createBuffer();
+               ResultPartitionConsumableNotifier notifier = 
mock(ResultPartitionConsumableNotifier.class);
+               try {
+                       ResultPartition partition = createPartition(notifier, 
pipelined, true);
+                       partition.release();
+                       // partition.add() silently drops the buffer but 
recycles it
+                       partition.add(buffer, 0);
+               } finally {
+                       if (!buffer.isRecycled()) {
+                               Assert.fail("buffer not recycled");
+                               buffer.recycle();
+                       }
+                       // should not have notified either
+                       verify(notifier, 
never()).notifyPartitionConsumable(any(JobID.class), 
any(ResultPartitionID.class), any(TaskActions.class));
+               }
+       }
+
        // 
------------------------------------------------------------------------
 
        private static ResultPartition createPartition(

Reply via email to