[hotfix][runtime] Deduplicate code in PipelinedSubpartition
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2214a242 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2214a242 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2214a242 Branch: refs/heads/master Commit: 2214a242f218dfc571f98b64fcde61f1a9f6013a Parents: 6c3c334 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> Authored: Thu Jan 18 10:41:18 2018 +0100 Committer: Piotr Nowojski <piotr.nowoj...@gmail.com> Committed: Mon Feb 19 12:21:19 2018 +0100 ---------------------------------------------------------------------- .../partition/PipelinedSubpartition.java | 41 +++++++------------- 1 file changed, 14 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2214a242/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index 0637cc7..9c6197c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -67,6 +67,16 @@ class PipelinedSubpartition extends ResultSubpartition { @Override public boolean add(Buffer buffer) throws IOException { + return add(buffer, false); + } + + @Override + public void finish() throws IOException { + add(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), true); + LOG.debug("Finished {}.", this); + } + + private boolean add(Buffer buffer, boolean finish) throws IOException { checkNotNull(buffer); // view reference accessible outside the lock, but assigned inside the locked scope @@ -83,41 +93,18 @@ class PipelinedSubpartition extends ResultSubpartition { reader = readView; updateStatistics(buffer); increaseBuffersInBacklog(buffer); - } - - // Notify the listener outside of the synchronized block - if (reader != null) { - reader.notifyBuffersAvailable(1); - } - - return true; - } - - @Override - public void finish() throws IOException { - final Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE); - - // view reference accessible outside the lock, but assigned inside the locked scope - final PipelinedSubpartitionView reader; - synchronized (buffers) { - if (isFinished || isReleased) { - return; + if (finish) { + isFinished = true; } - - buffers.add(buffer); - reader = readView; - updateStatistics(buffer); - - isFinished = true; } - LOG.debug("Finished {}.", this); - // Notify the listener outside of the synchronized block if (reader != null) { reader.notifyBuffersAvailable(1); } + + return true; } @Override