[FLINK-8786] [network] Fix SpillableSubpartitionView#getNextBuffer returning wrong isMoreAvailable when processing last in-memory buffer
When processing the last in-memory buffer in SpillableSubpartitionView#getNextBuffer while the rest of the buffers are spilled, need to rely on the spilled view's isAvailable instead of always setting the isMoreAvailable flag of the returned BufferAndBacklog to false. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/112c54fb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/112c54fb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/112c54fb Branch: refs/heads/master Commit: 112c54fb07e2ffffa3f33322ba99a9d59c1a8dbc Parents: 18b75e3 Author: Nico Kruber <n...@data-artisans.com> Authored: Mon Feb 26 16:27:44 2018 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Mar 9 16:49:40 2018 +0100 ---------------------------------------------------------------------- .../runtime/io/network/partition/SpillableSubpartitionView.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/112c54fb/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java index 3c73e43..0f51bc8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java @@ -177,7 +177,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView { SpilledSubpartitionView spilled = spilledView; if (spilled != null) { if (current != null) { - return new BufferAndBacklog(current, isMoreAvailable, newBacklog, spilled.nextBufferIsEvent()); + return new BufferAndBacklog(current, spilled.isAvailable(), newBacklog, spilled.nextBufferIsEvent()); } else { return spilled.getNextBuffer(); }