[FLINK-8786] [network] Fix SpillableSubpartitionView#getNextBuffer returning wrong isMoreAvailable when processing last in-memory buffer
When processing the last in-memory buffer in SpillableSubpartitionView#getNextBuffer while the rest of the buffers are spilled, need to rely on the spilled view's isAvailable instead of always setting the isMoreAvailable flag of the returned BufferAndBacklog to false. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/835adcc3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/835adcc3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/835adcc3 Branch: refs/heads/release-1.5 Commit: 835adcc373ce169f202055e9b4f9dc3fb9123772 Parents: 5c7457a Author: Nico Kruber <n...@data-artisans.com> Authored: Mon Feb 26 16:27:44 2018 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Mar 9 17:01:52 2018 +0100 ---------------------------------------------------------------------- .../runtime/io/network/partition/SpillableSubpartitionView.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/835adcc3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java index 3c73e43..0f51bc8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java @@ -177,7 +177,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView { SpilledSubpartitionView spilled = spilledView; if (spilled != null) { if (current != null) { - return new BufferAndBacklog(current, isMoreAvailable, newBacklog, spilled.nextBufferIsEvent()); + return new BufferAndBacklog(current, spilled.isAvailable(), newBacklog, spilled.nextBufferIsEvent()); } else { return spilled.getNextBuffer(); }