[hotfix][runtime] Fix recycleBuffer in ResultPartitionTest

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c3c334c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c3c334c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c3c334c

Branch: refs/heads/master
Commit: 6c3c334c65896e1d286083f576707029c76e8be9
Parents: 5ad8450
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Thu Jan 18 10:22:52 2018 +0100
Committer: Piotr Nowojski <piotr.nowoj...@gmail.com>
Committed: Mon Feb 19 12:21:18 2018 +0100

----------------------------------------------------------------------
 .../runtime/io/network/api/writer/ResultPartitionWriter.java   | 6 ++++++
 .../runtime/io/network/partition/ResultPartitionTest.java      | 5 ++---
 2 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6c3c334c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 454a9ea..a0a1dff 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -42,6 +42,9 @@ public interface ResultPartitionWriter {
         *
         * <p>For PIPELINED {@link 
org.apache.flink.runtime.io.network.partition.ResultPartitionType}s,
         * this will trigger the deployment of consuming tasks after the first 
buffer has been added.
+        *
+        * <p>This method takes the ownership of the passed {@code buffer} and 
thus is responsible for releasing it's
+        * resources.
         */
        void writeBuffer(Buffer buffer, int subpartitionIndex) throws 
IOException;
 
@@ -51,6 +54,9 @@ public interface ResultPartitionWriter {
         * <p>The buffer is taken over and used for each of the channels.
         * It will be recycled afterwards.
         *
+        * <p>This method takes the ownership of the passed {@code buffer} and 
thus is responsible for releasing it's
+        * resources.
+        *
         * @param buffer the buffer to write
         */
        default void writeBufferToAllSubpartitions(final Buffer buffer) throws 
IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/6c3c334c/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 9c02b65..907939a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -31,7 +31,6 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
@@ -127,8 +126,8 @@ public class ResultPartitionTest {
                        // expected => ignored
                } finally {
                        if (!buffer.isRecycled()) {
-                               Assert.fail("buffer not recycled");
                                buffer.recycleBuffer();
+                               Assert.fail("buffer not recycled");
                        }
                        // should not have notified either
                        verify(notifier, 
never()).notifyPartitionConsumable(any(JobID.class), 
any(ResultPartitionID.class), any(TaskActions.class));
@@ -161,8 +160,8 @@ public class ResultPartitionTest {
                        partition.writeBuffer(buffer, 0);
                } finally {
                        if (!buffer.isRecycled()) {
-                               Assert.fail("buffer not recycled");
                                buffer.recycleBuffer();
+                               Assert.fail("buffer not recycled");
                        }
                        // should not have notified either
                        verify(notifier, 
never()).notifyPartitionConsumable(any(JobID.class), 
any(ResultPartitionID.class), any(TaskActions.class));

Reply via email to