[hotfix][runtime] Deduplicate buffersInBacklog code in Pipelined and Spillable 
subtartitions


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10d11d79
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10d11d79
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10d11d79

Branch: refs/heads/master
Commit: 10d11d7991c18516d503dfcc82815d58fae01b46
Parents: 2214a24
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Thu Jan 18 12:09:15 2018 +0100
Committer: Piotr Nowojski <piotr.nowoj...@gmail.com>
Committed: Mon Feb 19 12:21:20 2018 +0100

----------------------------------------------------------------------
 .../partition/PipelinedSubpartition.java        | 47 ++------------------
 .../network/partition/ResultSubpartition.java   | 46 ++++++++++++++++++-
 .../partition/SpillableSubpartition.java        | 45 +------------------
 3 files changed, 51 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/10d11d79/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 9c6197c..2f4fd6a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -27,9 +26,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
+
 import java.io.IOException;
-import java.util.ArrayDeque;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -43,9 +41,6 @@ class PipelinedSubpartition extends ResultSubpartition {
 
        // 
------------------------------------------------------------------------
 
-       /** All buffers of this subpartition. Access to the buffers is 
synchronized on this object. */
-       private final ArrayDeque<Buffer> buffers = new ArrayDeque<>();
-
        /** The read view to consume this subpartition. */
        private PipelinedSubpartitionView readView;
 
@@ -55,10 +50,6 @@ class PipelinedSubpartition extends ResultSubpartition {
        /** Flag indicating whether the subpartition has been released. */
        private volatile boolean isReleased;
 
-       /** The number of non-event buffers currently in this subpartition. */
-       @GuardedBy("buffers")
-       private int buffersInBacklog;
-
        // 
------------------------------------------------------------------------
 
        PipelinedSubpartition(int index, ResultPartition parent) {
@@ -141,10 +132,10 @@ class PipelinedSubpartition extends ResultSubpartition {
        BufferAndBacklog pollBuffer() {
                synchronized (buffers) {
                        Buffer buffer = buffers.pollFirst();
-                       decreaseBuffersInBacklog(buffer);
+                       decreaseBuffersInBacklogUnsafe(buffer);
 
                        if (buffer != null) {
-                               return new BufferAndBacklog(buffer, 
buffersInBacklog, _nextBufferIsEvent());
+                               return new BufferAndBacklog(buffer, 
getBuffersInBacklog(), _nextBufferIsEvent());
                        } else {
                                return null;
                        }
@@ -176,36 +167,6 @@ class PipelinedSubpartition extends ResultSubpartition {
        }
 
        @Override
-       @VisibleForTesting
-       public int getBuffersInBacklog() {
-               return buffersInBacklog;
-       }
-
-       /**
-        * Decreases the number of non-event buffers by one after fetching a 
non-event
-        * buffer from this subpartition.
-        */
-       private void decreaseBuffersInBacklog(Buffer buffer) {
-               assert Thread.holdsLock(buffers);
-
-               if (buffer != null && buffer.isBuffer()) {
-                       buffersInBacklog--;
-               }
-       }
-
-       /**
-        * Increases the number of non-event buffers by one after adding a 
non-event
-        * buffer into this subpartition.
-        */
-       private void increaseBuffersInBacklog(Buffer buffer) {
-               assert Thread.holdsLock(buffers);
-
-               if (buffer != null && buffer.isBuffer()) {
-                       buffersInBacklog++;
-               }
-       }
-
-       @Override
        public PipelinedSubpartitionView 
createReadView(BufferAvailabilityListener availabilityListener) throws 
IOException {
                final int queueSize;
 
@@ -250,7 +211,7 @@ class PipelinedSubpartition extends ResultSubpartition {
 
                return String.format(
                        "PipelinedSubpartition [number of buffers: %d (%d 
bytes), number of buffers in backlog: %d, finished? %s, read view? %s]",
-                       numBuffers, numBytes, buffersInBacklog, finished, 
hasReadView);
+                       numBuffers, numBytes, getBuffersInBacklog(), finished, 
hasReadView);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/10d11d79/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index e51f215..19447b1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -21,7 +21,10 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 
+import javax.annotation.concurrent.GuardedBy;
+
 import java.io.IOException;
+import java.util.ArrayDeque;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -36,6 +39,13 @@ public abstract class ResultSubpartition {
        /** The parent partition this subpartition belongs to. */
        protected final ResultPartition parent;
 
+       /** All buffers of this subpartition. Access to the buffers is 
synchronized on this object. */
+       protected final ArrayDeque<Buffer> buffers = new ArrayDeque<>();
+
+       /** The number of non-event buffers currently in this subpartition */
+       @GuardedBy("buffers")
+       private int buffersInBacklog;
+
        // - Statistics 
----------------------------------------------------------
 
        /** The total number of buffers (both data and event buffers) */
@@ -104,7 +114,9 @@ public abstract class ResultSubpartition {
         * scenarios since it does not make any concurrency guarantees.
         */
        @VisibleForTesting
-       abstract public int getBuffersInBacklog();
+       public int getBuffersInBacklog() {
+               return buffersInBacklog;
+       }
 
        /**
         * Makes a best effort to get the current size of the queue.
@@ -113,6 +125,38 @@ public abstract class ResultSubpartition {
         */
        abstract public int unsynchronizedGetNumberOfQueuedBuffers();
 
+       /**
+        * Decreases the number of non-event buffers by one after fetching a 
non-event
+        * buffer from this subpartition (for access by the subpartition views).
+        *
+        * @return backlog after the operation
+        */
+       public int decreaseBuffersInBacklog(Buffer buffer) {
+               synchronized (buffers) {
+                       return decreaseBuffersInBacklogUnsafe(buffer);
+               }
+       }
+
+       protected int decreaseBuffersInBacklogUnsafe(Buffer buffer) {
+               assert Thread.holdsLock(buffers);
+               if (buffer != null && buffer.isBuffer()) {
+                       buffersInBacklog--;
+               }
+               return buffersInBacklog;
+       }
+
+       /**
+        * Increases the number of non-event buffers by one after adding a 
non-event
+        * buffer into this subpartition.
+        */
+       protected void increaseBuffersInBacklog(Buffer buffer) {
+               assert Thread.holdsLock(buffers);
+
+               if (buffer != null && buffer.isBuffer()) {
+                       buffersInBacklog++;
+               }
+       }
+
        // 
------------------------------------------------------------------------
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/10d11d79/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index e57e30a..dc0d0d8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -26,10 +25,10 @@ import 
org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.concurrent.GuardedBy;
 import java.io.IOException;
 import java.util.ArrayDeque;
 
@@ -70,9 +69,6 @@ class SpillableSubpartition extends ResultSubpartition {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(SpillableSubpartition.class);
 
-       /** Buffers are kept in this queue as long as we weren't ask to release 
any. */
-       private final ArrayDeque<Buffer> buffers = new ArrayDeque<>();
-
        /** The I/O manager used for spilling buffers to disk. */
        private final IOManager ioManager;
 
@@ -85,10 +81,6 @@ class SpillableSubpartition extends ResultSubpartition {
        /** Flag indicating whether the subpartition has been released. */
        private volatile boolean isReleased;
 
-       /** The number of non-event buffers currently in this subpartition */
-       @GuardedBy("buffers")
-       private int buffersInBacklog;
-
        /** The read view to consume this subpartition. */
        private ResultSubpartitionView readView;
 
@@ -264,39 +256,6 @@ class SpillableSubpartition extends ResultSubpartition {
        }
 
        @Override
-       @VisibleForTesting
-       public int getBuffersInBacklog() {
-               return buffersInBacklog;
-       }
-
-       /**
-        * Decreases the number of non-event buffers by one after fetching a 
non-event
-        * buffer from this subpartition (for access by the subpartition views).
-        *
-        * @return backlog after the operation
-        */
-       public int decreaseBuffersInBacklog(Buffer buffer) {
-               synchronized (buffers) {
-                       if (buffer != null && buffer.isBuffer()) {
-                               buffersInBacklog--;
-                       }
-                       return buffersInBacklog;
-               }
-       }
-
-       /**
-        * Increases the number of non-event buffers by one after adding a 
non-event
-        * buffer into this subpartition.
-        */
-       private void increaseBuffersInBacklog(Buffer buffer) {
-               assert Thread.holdsLock(buffers);
-
-               if (buffer != null && buffer.isBuffer()) {
-                       buffersInBacklog++;
-               }
-       }
-
-       @Override
        public int unsynchronizedGetNumberOfQueuedBuffers() {
                // since we do not synchronize, the size may actually be lower 
than 0!
                return Math.max(buffers.size(), 0);
@@ -307,7 +266,7 @@ class SpillableSubpartition extends ResultSubpartition {
                return String.format("SpillableSubpartition [%d number of 
buffers (%d bytes)," +
                                "%d number of buffers in backlog, finished? %s, 
read view? %s, spilled? %s]",
                        getTotalNumberOfBuffers(), getTotalNumberOfBytes(),
-                       buffersInBacklog, isFinished, readView != null, 
spillWriter != null);
+                       getBuffersInBacklog(), isFinished, readView != null, 
spillWriter != null);
        }
 
 }

Reply via email to