[hotfix][runtime] Fix recycleBuffer in ResultPartitionTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c3c334c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c3c334c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c3c334c Branch: refs/heads/master Commit: 6c3c334c65896e1d286083f576707029c76e8be9 Parents: 5ad8450 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> Authored: Thu Jan 18 10:22:52 2018 +0100 Committer: Piotr Nowojski <piotr.nowoj...@gmail.com> Committed: Mon Feb 19 12:21:18 2018 +0100 ---------------------------------------------------------------------- .../runtime/io/network/api/writer/ResultPartitionWriter.java | 6 ++++++ .../runtime/io/network/partition/ResultPartitionTest.java | 5 ++--- 2 files changed, 8 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6c3c334c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java index 454a9ea..a0a1dff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java @@ -42,6 +42,9 @@ public interface ResultPartitionWriter { * * <p>For PIPELINED {@link org.apache.flink.runtime.io.network.partition.ResultPartitionType}s, * this will trigger the deployment of consuming tasks after the first buffer has been added. + * + * <p>This method takes the ownership of the passed {@code buffer} and thus is responsible for releasing it's + * resources. */ void writeBuffer(Buffer buffer, int subpartitionIndex) throws IOException; @@ -51,6 +54,9 @@ public interface ResultPartitionWriter { * <p>The buffer is taken over and used for each of the channels. * It will be recycled afterwards. * + * <p>This method takes the ownership of the passed {@code buffer} and thus is responsible for releasing it's + * resources. + * * @param buffer the buffer to write */ default void writeBufferToAllSubpartitions(final Buffer buffer) throws IOException { http://git-wip-us.apache.org/repos/asf/flink/blob/6c3c334c/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java index 9c02b65..907939a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java @@ -31,7 +31,6 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.Test; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -127,8 +126,8 @@ public class ResultPartitionTest { // expected => ignored } finally { if (!buffer.isRecycled()) { - Assert.fail("buffer not recycled"); buffer.recycleBuffer(); + Assert.fail("buffer not recycled"); } // should not have notified either verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class)); @@ -161,8 +160,8 @@ public class ResultPartitionTest { partition.writeBuffer(buffer, 0); } finally { if (!buffer.isRecycled()) { - Assert.fail("buffer not recycled"); buffer.recycleBuffer(); + Assert.fail("buffer not recycled"); } // should not have notified either verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));