[hotfix][network] various minor improvements
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d30df346 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d30df346 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d30df346 Branch: refs/heads/master Commit: d30df346b43180a5a5e90b84b0f19b7f379985e2 Parents: 81259ad Author: Nico Kruber <[email protected]> Authored: Tue Feb 20 18:07:02 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Tue Feb 27 09:07:14 2018 +0100 ---------------------------------------------------------------------- .../flink/runtime/io/network/partition/PipelinedSubpartition.java | 2 ++ .../api/writer/AbstractCollectingResultPartitionWriter.java | 2 +- .../runtime/io/network/partition/SpillableSubpartitionTest.java | 2 -- 3 files changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d30df346/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index dcaa360..a9c6e57 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -265,6 +265,8 @@ class PipelinedSubpartition extends ResultSubpartition { } private int getNumberOfFinishedBuffers() { + assert Thread.holdsLock(buffers); + if (buffers.size() == 1 && buffers.peekLast().isFinished()) { return 1; } http://git-wip-us.apache.org/repos/asf/flink/blob/d30df346/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java index 0324375..981ca56 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java @@ -76,7 +76,7 @@ public abstract class AbstractCollectingResultPartitionWriter implements ResultP Buffer buffer = bufferConsumer.build(); try { deserializeBuffer(buffer); - if (!bufferConsumers.peek().isFinished()) { + if (!bufferConsumer.isFinished()) { break; } bufferConsumers.pop().close(); http://git-wip-us.apache.org/repos/asf/flink/blob/d30df346/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java index 43bcd31..a6be748 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java @@ -227,7 +227,6 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { assertTrue(read.buffer().isBuffer()); assertEquals(2, partition.getBuffersInBacklog()); assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - assertNotSame(bufferConsumer, read); assertFalse(read.buffer().isRecycled()); read.buffer().recycleBuffer(); assertTrue(read.buffer().isRecycled()); @@ -239,7 +238,6 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { assertTrue(read.buffer().isBuffer()); assertEquals(1, partition.getBuffersInBacklog()); assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - assertNotSame(bufferConsumer, read); assertFalse(read.buffer().isRecycled()); read.buffer().recycleBuffer(); assertTrue(read.buffer().isRecycled());
