[hotfix][runtime] Deduplicate code in PipelinedSubpartition

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2214a242
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2214a242
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2214a242

Branch: refs/heads/master
Commit: 2214a242f218dfc571f98b64fcde61f1a9f6013a
Parents: 6c3c334
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Thu Jan 18 10:41:18 2018 +0100
Committer: Piotr Nowojski <piotr.nowoj...@gmail.com>
Committed: Mon Feb 19 12:21:19 2018 +0100

----------------------------------------------------------------------
 .../partition/PipelinedSubpartition.java        | 41 +++++++-------------
 1 file changed, 14 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2214a242/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 0637cc7..9c6197c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -67,6 +67,16 @@ class PipelinedSubpartition extends ResultSubpartition {
 
        @Override
        public boolean add(Buffer buffer) throws IOException {
+               return add(buffer, false);
+       }
+
+       @Override
+       public void finish() throws IOException {
+               add(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), 
true);
+               LOG.debug("Finished {}.", this);
+       }
+
+       private boolean add(Buffer buffer, boolean finish) throws IOException {
                checkNotNull(buffer);
 
                // view reference accessible outside the lock, but assigned 
inside the locked scope
@@ -83,41 +93,18 @@ class PipelinedSubpartition extends ResultSubpartition {
                        reader = readView;
                        updateStatistics(buffer);
                        increaseBuffersInBacklog(buffer);
-               }
-
-               // Notify the listener outside of the synchronized block
-               if (reader != null) {
-                       reader.notifyBuffersAvailable(1);
-               }
-
-               return true;
-       }
-
-       @Override
-       public void finish() throws IOException {
-               final Buffer buffer = 
EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
-
-               // view reference accessible outside the lock, but assigned 
inside the locked scope
-               final PipelinedSubpartitionView reader;
 
-               synchronized (buffers) {
-                       if (isFinished || isReleased) {
-                               return;
+                       if (finish) {
+                               isFinished = true;
                        }
-
-                       buffers.add(buffer);
-                       reader = readView;
-                       updateStatistics(buffer);
-
-                       isFinished = true;
                }
 
-               LOG.debug("Finished {}.", this);
-
                // Notify the listener outside of the synchronized block
                if (reader != null) {
                        reader.notifyBuffersAvailable(1);
                }
+
+               return true;
        }
 
        @Override

Reply via email to