NicoK closed pull request #6692: [FLINK-10331][network] reduce unnecesary 
flushing
URL: https://github.com/apache/flink/pull/6692
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 41ee03db259..8630acee9a8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -485,7 +485,7 @@ private void initializeWithPartialRecord(NonSpanningWrapper 
partial, int nextRec
                        }
                        else {
                                // collect in memory
-                               ensureBufferCapacity(numBytesChunk);
+                               ensureBufferCapacity(nextRecordLength);
                                partial.segment.get(partial.position, buffer, 
0, numBytesChunk);
                        }
 
@@ -502,8 +502,8 @@ private void addNextChunkFromMemorySegment(MemorySegment 
segment, int offset, in
                        int segmentRemaining = numBytes;
                        // check where to go. if we have a partial length, we 
need to complete it first
                        if (this.lengthBuffer.position() > 0) {
-                               int toPut = 
Math.min(this.lengthBuffer.remaining(), numBytes);
-                               segment.get(offset, this.lengthBuffer, toPut);
+                               int toPut = 
Math.min(this.lengthBuffer.remaining(), segmentRemaining);
+                               segment.get(segmentPosition, this.lengthBuffer, 
toPut);
                                // did we complete the length?
                                if (this.lengthBuffer.hasRemaining()) {
                                        return;
@@ -515,6 +515,8 @@ private void addNextChunkFromMemorySegment(MemorySegment 
segment, int offset, in
                                        segmentRemaining -= toPut;
                                        if (this.recordLength > 
THRESHOLD_FOR_SPILLING) {
                                                this.spillingChannel = 
createSpillingChannel();
+                                       } else {
+                                               
ensureBufferCapacity(this.recordLength);
                                        }
                                }
                        }
@@ -527,9 +529,7 @@ private void addNextChunkFromMemorySegment(MemorySegment 
segment, int offset, in
                                // spill to file
                                ByteBuffer toWrite = 
segment.wrap(segmentPosition, toCopy);
                                this.spillingChannel.write(toWrite);
-                       }
-                       else {
-                               ensureBufferCapacity(accumulatedRecordBytes + 
toCopy);
+                       } else {
                                segment.get(segmentPosition, buffer, 
this.accumulatedRecordBytes, toCopy);
                        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
index 305f1842911..6fb067ef8c3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
@@ -104,9 +104,9 @@ public void commit() {
         * @return number of written bytes.
         */
        public int finish() {
-               positionMarker.markFinished();
+               int writtenBytes = positionMarker.markFinished();
                commit();
-               return getWrittenBytes();
+               return writtenBytes;
        }
 
        public boolean isFinished() {
@@ -118,18 +118,10 @@ public boolean isFull() {
                return positionMarker.getCached() == getMaxCapacity();
        }
 
-       public boolean isEmpty() {
-               return positionMarker.getCached() == 0;
-       }
-
        public int getMaxCapacity() {
                return memorySegment.size();
        }
 
-       private int getWrittenBytes() {
-               return positionMarker.getCached();
-       }
-
        /**
         * Holds a reference to the current writer position. Negative values 
indicate that writer ({@link BufferBuilder}
         * has finished. Value {@code Integer.MIN_VALUE} represents finished 
empty buffer.
@@ -156,7 +148,7 @@ static int getAbsolute(int position) {
         * Cached writing implementation of {@link PositionMarker}.
         *
         * <p>Writer ({@link BufferBuilder}) and reader ({@link 
BufferConsumer}) caches must be implemented independently
-        * of one another - for example the cached values can not accidentally 
leak from one to another.
+        * of one another - so that the cached values can not accidentally leak 
from one to another.
         *
         * <p>Remember to commit the {@link SettablePositionMarker} to make the 
changes visible.
         */
@@ -181,12 +173,19 @@ public int getCached() {
                        return PositionMarker.getAbsolute(cachedPosition);
                }
 
-               public void markFinished() {
-                       int newValue = -getCached();
+               /**
+                * Marks this position as finished and returns the current 
position.
+                *
+                * @return current position as of {@link #getCached()}
+                */
+               public int markFinished() {
+                       int currentPosition = getCached();
+                       int newValue = -currentPosition;
                        if (newValue == 0) {
                                newValue = FINISHED_EMPTY;
                        }
                        set(newValue);
+                       return currentPosition;
                }
 
                public void move(int offset) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
index 4bad92f06b0..abde3ffc793 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
@@ -42,7 +42,7 @@
 
        private final CachedPositionMarker writerPosition;
 
-       private int currentReaderPosition = 0;
+       private int currentReaderPosition;
 
        /**
         * Constructs {@link BufferConsumer} instance with content that can be 
changed by {@link BufferBuilder}.
@@ -74,6 +74,14 @@ private BufferConsumer(Buffer buffer, 
BufferBuilder.PositionMarker currentWriter
                this.currentReaderPosition = currentReaderPosition;
        }
 
+       /**
+        * Checks whether the {@link BufferBuilder} has already been finished.
+        *
+        * <p>BEWARE: this method accesses the cached value of the position 
marker which is only updated
+        * after calls to {@link #build()}!
+        *
+        * @return <tt>true</tt> if the buffer was finished, <tt>false</tt> 
otherwise
+        */
        public boolean isFinished() {
                return writerPosition.isFinished();
        }
@@ -84,17 +92,20 @@ public boolean isFinished() {
         */
        public Buffer build() {
                writerPosition.update();
-               Buffer slice = buffer.readOnlySlice(currentReaderPosition, 
writerPosition.getCached() - currentReaderPosition);
-               currentReaderPosition = writerPosition.getCached();
+               int cachedWriterPosition = writerPosition.getCached();
+               Buffer slice = buffer.readOnlySlice(currentReaderPosition, 
cachedWriterPosition - currentReaderPosition);
+               currentReaderPosition = cachedWriterPosition;
                return slice.retainBuffer();
        }
 
        /**
-        * @return a retained copy of self with separate indexes - it allows 
two read from the same {@link MemorySegment}
+        * Creates a retained copy of self with separate indexes which allows 
two read from the same {@link MemorySegment}
         * twice.
         *
-        * <p>WARNING: newly returned {@link BufferConsumer} will have reader 
index copied from the original buffer. In
-        * other words, data already consumed before copying will not be 
visible to the returned copies.
+        * <p>WARNING: the newly returned {@link BufferConsumer} will have its 
reader index copied from the original buffer.
+        * In other words, data already consumed before copying will not be 
visible to the returned copies.
+        *
+        * @return a retained copy of self with separate indexes
         */
        public BufferConsumer copy() {
                return new BufferConsumer(buffer.retainBuffer(), 
writerPosition.positionMarker, currentReaderPosition);
@@ -123,7 +134,7 @@ public int getWrittenBytes() {
         * Cached reading wrapper around {@link PositionMarker}.
         *
         * <p>Writer ({@link BufferBuilder}) and reader ({@link 
BufferConsumer}) caches must be implemented independently
-        * of one another - for example the cached values can not accidentally 
leak from one to another.
+        * of one another - so that the cached values can not accidentally leak 
from one to another.
         */
        private static class CachedPositionMarker {
                private final PositionMarker positionMarker;
@@ -133,7 +144,7 @@ public int getWrittenBytes() {
                 */
                private int cachedPosition;
 
-               public CachedPositionMarker(PositionMarker positionMarker) {
+               CachedPositionMarker(PositionMarker positionMarker) {
                        this.positionMarker = checkNotNull(positionMarker);
                        update();
                }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
index deb0f4d8e75..a5bf30f4c6b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
@@ -92,7 +92,7 @@ public NetworkBuffer(MemorySegment memorySegment, 
BufferRecycler recycler, boole
 
        /**
         * Creates a new buffer instance backed by the given 
<tt>memorySegment</tt> with <tt>0</tt> for
-        * the <tt>readerIndex</tt> and <tt>writerIndex</tt>.
+        * the <tt>readerIndex</tt> and <tt>size</tt> as <tt>writerIndex</tt>.
         *
         * @param memorySegment
         *              backing memory segment (defines {@link #maxCapacity})
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
index 9aa3920934e..90daf75fcc7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
@@ -327,8 +327,7 @@ private void decodeBufferOrEvent(RemoteInputChannel 
inputChannel, NettyMessage.B
                                nettyBuffer.readBytes(byteArray);
 
                                MemorySegment memSeg = 
MemorySegmentFactory.wrap(byteArray);
-                               Buffer buffer = new NetworkBuffer(memSeg, 
FreeingBufferRecycler.INSTANCE, false);
-                               buffer.setSize(receivedSize);
+                               Buffer buffer = new NetworkBuffer(memSeg, 
FreeingBufferRecycler.INSTANCE, false, receivedSize);
 
                                inputChannel.onBuffer(buffer, 
bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
                        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 367c62d5acf..796e86f51b3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -337,8 +337,7 @@ else if (bufferProvider.isDestroyed()) {
                                nettyBuffer.readBytes(byteArray);
 
                                MemorySegment memSeg = 
MemorySegmentFactory.wrap(byteArray);
-                               Buffer buffer = new NetworkBuffer(memSeg, 
FreeingBufferRecycler.INSTANCE, false);
-                               buffer.setSize(receivedSize);
+                               Buffer buffer = new NetworkBuffer(memSeg, 
FreeingBufferRecycler.INSTANCE, false, receivedSize);
 
                                inputChannel.onBuffer(buffer, 
bufferOrEvent.sequenceNumber, -1);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
index 74798625416..2f7881610dd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
@@ -20,6 +20,9 @@
 
 import java.io.IOException;
 
+/**
+ * Exception for failed partition requests due to non-existing partitions.
+ */
 public class PartitionNotFoundException extends IOException {
 
        private static final long serialVersionUID = 0L;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index c6f3e158519..d2d7fdb324b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -36,6 +36,19 @@
 
 /**
  * A pipelined in-memory only subpartition, which can be consumed once.
+ *
+ * <p>Whenever {@link #add(BufferConsumer)} adds a finished {@link 
BufferConsumer} or a second
+ * {@link BufferConsumer} (in which case we will assume the first one 
finished), we will
+ * {@link PipelinedSubpartitionView#notifyDataAvailable() notify} a read view 
created via
+ * {@link #createReadView(BufferAvailabilityListener)} of new data 
availability. Except by calling
+ * {@link #flush()} explicitly, we always only notify when the first finished 
buffer turns up and
+ * then, the reader has to drain the buffers via {@link #pollBuffer()} until 
its return value shows
+ * no more buffers being available. This results in a buffer queue which is 
either empty or has an
+ * unfinished {@link BufferConsumer} left from which the notifications will 
eventually start again.
+ *
+ * <p>Explicit calls to {@link #flush()} will force this
+ * {@link PipelinedSubpartitionView#notifyDataAvailable() notification} for any
+ * {@link BufferConsumer} present in the queue.
  */
 class PipelinedSubpartition extends ResultSubpartition {
 
@@ -66,17 +79,6 @@ public boolean add(BufferConsumer bufferConsumer) {
                return add(bufferConsumer, false);
        }
 
-       @Override
-       public void flush() {
-               synchronized (buffers) {
-                       if (buffers.isEmpty()) {
-                               return;
-                       }
-                       flushRequested = !buffers.isEmpty();
-                       notifyDataAvailable();
-               }
-       }
-
        @Override
        public void finish() throws IOException {
                
add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true);
@@ -99,7 +101,7 @@ private boolean add(BufferConsumer bufferConsumer, boolean 
finish) {
 
                        if (finish) {
                                isFinished = true;
-                               flush();
+                               notifyDataAvailable();
                        }
                        else {
                                maybeNotifyDataAvailable();
@@ -188,17 +190,17 @@ BufferAndBacklog pollBuffer() {
                                buffer,
                                isAvailableUnsafe(),
                                getBuffersInBacklog(),
-                               _nextBufferIsEvent());
+                               nextBufferIsEventUnsafe());
                }
        }
 
        boolean nextBufferIsEvent() {
                synchronized (buffers) {
-                       return _nextBufferIsEvent();
+                       return nextBufferIsEventUnsafe();
                }
        }
 
-       private boolean _nextBufferIsEvent() {
+       private boolean nextBufferIsEventUnsafe() {
                assert Thread.holdsLock(buffers);
 
                return !buffers.isEmpty() && !buffers.peekFirst().isBuffer();
@@ -279,6 +281,23 @@ public int unsynchronizedGetNumberOfQueuedBuffers() {
                return Math.max(buffers.size(), 0);
        }
 
+       @Override
+       public void flush() {
+               synchronized (buffers) {
+                       if (buffers.isEmpty()) {
+                               return;
+                       }
+                       if (!flushRequested) {
+                               flushRequested = true; // set this before the 
notification!
+                               // if there is more then 1 buffer, we already 
notified the reader
+                               // (at the latest when adding the second buffer)
+                               if (buffers.size() == 1) {
+                                       notifyDataAvailable();
+                               }
+                       }
+               }
+       }
+
        private void maybeNotifyDataAvailable() {
                // Notify only when we added first finished buffer.
                if (getNumberOfFinishedBuffers() == 1) {
@@ -295,6 +314,9 @@ private void notifyDataAvailable() {
        private int getNumberOfFinishedBuffers() {
                assert Thread.holdsLock(buffers);
 
+               // NOTE: isFinished() is not guaranteed to provide the most 
up-to-date state here
+               // worst-case: a single finished buffer sits around until the 
next flush() call
+               // (but we do not offer stronger guarantees anyway)
                if (buffers.size() == 1 && buffers.peekLast().isFinished()) {
                        return 1;
                }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 93e5ba15097..b32f73f8bcc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -48,17 +48,17 @@
 /**
  * A result partition for data produced by a single task.
  *
- * <p> This class is the runtime part of a logical {@link 
IntermediateResultPartition}. Essentially,
+ * <p>This class is the runtime part of a logical {@link 
IntermediateResultPartition}. Essentially,
  * a result partition is a collection of {@link Buffer} instances. The buffers 
are organized in one
  * or more {@link ResultSubpartition} instances, which further partition the 
data depending on the
  * number of consuming tasks and the data {@link DistributionPattern}.
  *
- * <p> Tasks, which consume a result partition have to request one of its 
subpartitions. The request
+ * <p>Tasks, which consume a result partition have to request one of its 
subpartitions. The request
  * happens either remotely (see {@link RemoteInputChannel}) or locally (see 
{@link LocalInputChannel})
  *
  * <h2>Life-cycle</h2>
  *
- * The life-cycle of each result partition has three (possibly overlapping) 
phases:
+ * <p>The life-cycle of each result partition has three (possibly overlapping) 
phases:
  * <ol>
  * <li><strong>Produce</strong>: </li>
  * <li><strong>Consume</strong>: </li>
@@ -67,7 +67,7 @@
  *
  * <h2>Lazy deployment and updates of consuming tasks</h2>
  *
- * Before a consuming task can request the result, it has to be deployed. The 
time of deployment
+ * <p>Before a consuming task can request the result, it has to be deployed. 
The time of deployment
  * depends on the PIPELINED vs. BLOCKING characteristic of the result 
partition. With pipelined
  * results, receivers are deployed as soon as the first buffer is added to the 
result partition.
  * With blocking results on the other hand, receivers are deployed after the 
partition is finished.
@@ -79,7 +79,7 @@
 public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner 
{
 
        private static final Logger LOG = 
LoggerFactory.getLogger(ResultPartition.class);
-       
+
        private final String owningTaskName;
 
        private final TaskActions taskActions;
@@ -174,10 +174,10 @@ public ResultPartition(
 
        /**
         * Registers a buffer pool with this result partition.
-        * <p>
-        * There is one pool for each result partition, which is shared by all 
its sub partitions.
-        * <p>
-        * The pool is registered with the partition *after* it as been 
constructed in order to conform
+        *
+        * <p>There is one pool for each result partition, which is shared by 
all its sub partitions.
+        *
+        * <p>The pool is registered with the partition *after* it as been 
constructed in order to conform
         * to the life-cycle of task registrations in the {@link TaskManager}.
         */
        public void registerBufferPool(BufferPool bufferPool) {
@@ -276,9 +276,9 @@ public void flush(int subpartitionIndex) {
        /**
         * Finishes the result partition.
         *
-        * <p> After this operation, it is not possible to add further data to 
the result partition.
+        * <p>After this operation, it is not possible to add further data to 
the result partition.
         *
-        * <p> For BLOCKING results, this will trigger the deployment of 
consuming tasks.
+        * <p>For BLOCKING results, this will trigger the deployment of 
consuming tasks.
         */
        public void finish() throws IOException {
                boolean success = false;
@@ -366,7 +366,7 @@ public int getNumTargetKeyGroups() {
        /**
         * Releases buffers held by this result partition.
         *
-        * <p> This is a callback from the buffer pool, which is registered for 
result partitions, which
+        * <p>This is a callback from the buffer pool, which is registered for 
result partitions, which
         * are back pressure-free.
         */
        @Override
@@ -395,7 +395,7 @@ public String toString() {
        /**
         * Pins the result partition.
         *
-        * <p> The partition can only be released after each subpartition has 
been consumed once per pin
+        * <p>The partition can only be released after each subpartition has 
been consumed once per pin
         * operation.
         */
        void pin() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
index 02212ced905..10eb0868e87 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 
+/**
+ * Interface for notifications about consumable partitions.
+ */
 public interface ResultPartitionConsumableNotifier {
        void notifyPartitionConsumable(JobID jobId, ResultPartitionID 
partitionId, TaskActions taskActions);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
index b84c33b8afb..cee79a0d2a0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
@@ -27,7 +27,7 @@
 /**
  * Runtime identifier of a produced {@link IntermediateResultPartition}.
  *
- * <p> In failure cases the {@link IntermediateResultPartitionID} is not 
enough to uniquely
+ * <p>In failure cases the {@link IntermediateResultPartitionID} is not enough 
to uniquely
  * identify a result partition. It needs to be associated with the producing 
task as well to ensure
  * correct tracking of failed/restarted tasks.
  */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
index db72d63cf8e..faeaaf2f997 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
@@ -20,6 +20,9 @@
 
 import java.io.IOException;
 
+/**
+ * Interface for creating result partitions.
+ */
 public interface ResultPartitionProvider {
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
index 256387c31f4..f62dbeebf64 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+/**
+ * Type of a result partition.
+ */
 public enum ResultPartitionType {
 
        BLOCKING(false, false, false),
@@ -27,12 +30,12 @@
        /**
         * Pipelined partitions with a bounded (local) buffer pool.
         *
-        * For streaming jobs, a fixed limit on the buffer pool size should 
help avoid that too much
+        * <p>For streaming jobs, a fixed limit on the buffer pool size should 
help avoid that too much
         * data is being buffered and checkpoint barriers are delayed. In 
contrast to limiting the
         * overall network buffer pool size, this, however, still allows to be 
flexible with regards
         * to the total number of partitions by selecting an appropriately big 
network buffer pool size.
         *
-        * For batch jobs, it will be best to keep this unlimited ({@link 
#PIPELINED}) since there are
+        * <p>For batch jobs, it will be best to keep this unlimited ({@link 
#PIPELINED}) since there are
         * no checkpoint barriers.
         */
        PIPELINED_BOUNDED(true, true, true);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index adc0ed35a2f..58a140221e3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -43,16 +43,16 @@
        /** All buffers of this subpartition. Access to the buffers is 
synchronized on this object. */
        protected final ArrayDeque<BufferConsumer> buffers = new ArrayDeque<>();
 
-       /** The number of non-event buffers currently in this subpartition */
+       /** The number of non-event buffers currently in this subpartition. */
        @GuardedBy("buffers")
        private int buffersInBacklog;
 
        // - Statistics 
----------------------------------------------------------
 
-       /** The total number of buffers (both data and event buffers) */
+       /** The total number of buffers (both data and event buffers). */
        private long totalNumberOfBuffers;
 
-       /** The total number of bytes (both data and event buffers) */
+       /** The total number of bytes (both data and event buffers). */
        private long totalNumberOfBytes;
 
        public ResultSubpartition(int index, ResultPartition parent) {
@@ -102,19 +102,19 @@ protected Throwable getFailureCause() {
         * @throws IOException
         *              thrown in case of errors while adding the buffer
         */
-       abstract public boolean add(BufferConsumer bufferConsumer) throws 
IOException;
+       public abstract boolean add(BufferConsumer bufferConsumer) throws 
IOException;
 
-       abstract public void flush();
+       public abstract void flush();
 
-       abstract public void finish() throws IOException;
+       public abstract void finish() throws IOException;
 
-       abstract public void release() throws IOException;
+       public abstract void release() throws IOException;
 
-       abstract public ResultSubpartitionView 
createReadView(BufferAvailabilityListener availabilityListener) throws 
IOException;
+       public abstract ResultSubpartitionView 
createReadView(BufferAvailabilityListener availabilityListener) throws 
IOException;
 
        abstract int releaseMemory() throws IOException;
 
-       abstract public boolean isReleased();
+       public abstract boolean isReleased();
 
        /**
         * Gets the number of non-event buffers in this subpartition.
@@ -132,7 +132,7 @@ public int getBuffersInBacklog() {
         * This method must not acquire locks or interfere with the task and 
network threads in
         * any way.
         */
-       abstract public int unsynchronizedGetNumberOfQueuedBuffers();
+       public abstract int unsynchronizedGetNumberOfQueuedBuffers();
 
        /**
         * Decreases the number of non-event buffers by one after fetching a 
non-event
@@ -198,7 +198,6 @@ public int buffersInBacklog() {
                        return buffersInBacklog;
                }
 
-
                public boolean nextBufferIsEvent() {
                        return nextBufferIsEvent;
                }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
index b1ccd634704..a7559553857 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
@@ -22,6 +22,7 @@
 import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 
 /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 2a7cedf6949..a08ecc21f7d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -34,8 +34,8 @@
 
 /**
  * An input channel consumes a single {@link ResultSubpartitionView}.
- * <p>
- * For each channel, the consumption life cycle is as follows:
+ *
+ * <p>For each channel, the consumption life cycle is as follows:
  * <ol>
  * <li>{@link #requestSubpartition(int)}</li>
  * <li>{@link #getNextBuffer()}</li>
@@ -66,7 +66,7 @@
 
        protected final Counter numBuffersIn;
 
-       /** The current backoff (in ms) */
+       /** The current backoff (in ms). */
        private int currentBackoff;
 
        protected InputChannel(
@@ -111,12 +111,12 @@ public ResultPartitionID getPartitionId() {
 
        /**
         * Notifies the owning {@link SingleInputGate} that this channel became 
non-empty.
-        * 
+        *
         * <p>This is guaranteed to be called only when a Buffer was added to a 
previously
         * empty input channel. The notion of empty is atomically consistent 
with the flag
         * {@link BufferAndAvailability#moreAvailable()} when polling the next 
buffer
         * from this channel.
-        * 
+        *
         * <p><b>Note:</b> When the input channel observes an exception, this
         * method is called regardless of whether the channel was empty before. 
That ensures
         * that the parent InputGate will always be notified about the 
exception.
@@ -132,8 +132,8 @@ protected void notifyChannelNonEmpty() {
        /**
         * Requests the queue with the specified index of the source 
intermediate
         * result partition.
-        * <p>
-        * The queue index to request depends on which sub task the channel 
belongs
+        *
+        * <p>The queue index to request depends on which sub task the channel 
belongs
         * to and is specified by the consumer of this channel.
         */
        abstract void requestSubpartition(int subpartitionIndex) throws 
IOException, InterruptedException;
@@ -149,8 +149,8 @@ protected void notifyChannelNonEmpty() {
 
        /**
         * Sends a {@link TaskEvent} back to the task producing the consumed 
result partition.
-        * <p>
-        * <strong>Important</strong>: The producing task has to be running to 
receive backwards events.
+        *
+        * <p><strong>Important</strong>: The producing task has to be running 
to receive backwards events.
         * This means that the result type needs to be pipelined and the task 
logic has to ensure that
         * the producer will wait for all backwards events. Otherwise, this 
will lead to an Exception
         * at runtime.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelID.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelID.java
index ceeb83dab40..c1886de6905 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelID.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelID.java
@@ -22,6 +22,9 @@
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 
+/**
+ * Identifier for input channels.
+ */
 public class InputChannelID extends AbstractID {
 
        private static final long serialVersionUID = 1L;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index c78abb5165a..6e59f915730 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -26,10 +26,10 @@
 /**
  * An input gate consumes one or more partitions of a single produced 
intermediate result.
  *
- * <p> Each intermediate result is partitioned over its producing parallel 
subtasks; each of these
+ * <p>Each intermediate result is partitioned over its producing parallel 
subtasks; each of these
  * partitions is furthermore partitioned into one or more subpartitions.
  *
- * <p> As an example, consider a map-reduce program, where the map operator 
produces data and the
+ * <p>As an example, consider a map-reduce program, where the map operator 
produces data and the
  * reduce operator consumes the produced data.
  *
  * <pre>{@code
@@ -38,7 +38,7 @@
  * +-----+              +---------------------+              +--------+
  * }</pre>
  *
- * <p> When deploying such a program in parallel, the intermediate result will 
be partitioned over its
+ * <p>When deploying such a program in parallel, the intermediate result will 
be partitioned over its
  * producing parallel subtasks; each of these partitions is furthermore 
partitioned into one or more
  * subpartitions.
  *
@@ -59,7 +59,7 @@
  *               +-----------------------------------------+
  * }</pre>
  *
- * <p> In the above example, two map subtasks produce the intermediate result 
in parallel, resulting
+ * <p>In the above example, two map subtasks produce the intermediate result 
in parallel, resulting
  * in two partitions (Partition 1 and 2). Each of these partitions is further 
partitioned into two
  * subpartitions -- one for each parallel reduce subtask. As shown in the 
Figure, each reduce task
  * will have an input gate attached to it. This will provide its input, which 
will consist of one
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
index 69af4553fe4..ebb8b9d131b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
@@ -40,7 +40,7 @@ private InputGateMetrics(SingleInputGate inputGate) {
 
        // 
------------------------------------------------------------------------
 
-       // these methods are package private to make access from the nested 
classes faster 
+       // these methods are package private to make access from the nested 
classes faster
 
        /**
         * Iterates over all input channels and collects the total number of 
queued buffers in a
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 4b3a8ff9773..e7986bb381c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -57,7 +57,7 @@
        /** Task event dispatcher for backwards events. */
        private final TaskEventDispatcher taskEventDispatcher;
 
-       /** The consumed subpartition */
+       /** The consumed subpartition. */
        private volatile ResultSubpartitionView subpartitionView;
 
        private volatile boolean isReleased;
@@ -245,7 +245,7 @@ void notifySubpartitionConsumed() throws IOException {
        }
 
        /**
-        * Releases the partition reader
+        * Releases the partition reader.
         */
        @Override
        void releaseAllResources() throws IOException {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index 20a7aed76e2..c0e9177b294 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 
 import java.io.IOException;
 import java.util.Optional;
@@ -89,8 +89,8 @@ public void sendTaskEvent(TaskEvent event) throws IOException 
{
 
        /**
         * Returns <code>false</code>.
-        * <p>
-        * <strong>Important</strong>: It is important that the method correctly
+        *
+        * <p><strong>Important</strong>: It is important that the method 
correctly
         * always <code>false</code> for unknown input channels in order to not
         * finish the consumption of an intermediate result partition early.
         */
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index f73ede78dfd..f7db40bd921 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -59,7 +59,7 @@ public ResultSubpartitionView answer(InvocationOnMock 
invocation) throws Throwab
 
                return manager;
        }
-       
+
        public static ConnectionManager createDummyConnectionManager() throws 
Exception {
                final PartitionRequestClient mockClient = 
mock(PartitionRequestClient.class);
 
@@ -71,6 +71,6 @@ public static ConnectionManager 
createDummyConnectionManager() throws Exception
 
        // 
------------------------------------------------------------------------
 
-       /** This class is not meant to be instantiated */
+       /** This class is not meant to be instantiated. */
        private InputChannelTestUtils() {}
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
index 73f3cfbc49a..5c643af1739 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
@@ -23,6 +23,7 @@
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
@@ -46,6 +47,9 @@
 import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.mock;
 
+/**
+ * Concurrency tests for input gates.
+ */
 public class InputGateConcurrentTest {
 
        @Test
@@ -192,9 +196,11 @@ public void testConsumptionWithMixedChannels() throws 
Exception {
        //  testing threads
        // 
------------------------------------------------------------------------
 
-       private static abstract class Source {
-       
+       private abstract static class Source {
+
                abstract void addBufferConsumer(BufferConsumer bufferConsumer) 
throws Exception;
+
+               abstract void flush();
        }
 
        private static class PipelinedSubpartitionSource extends Source {
@@ -209,6 +215,11 @@ public void testConsumptionWithMixedChannels() throws 
Exception {
                void addBufferConsumer(BufferConsumer bufferConsumer) throws 
Exception {
                        partition.add(bufferConsumer);
                }
+
+               @Override
+               void flush() {
+                       partition.flush();
+               }
        }
 
        private static class RemoteChannelSource extends Source {
@@ -222,14 +233,19 @@ void addBufferConsumer(BufferConsumer bufferConsumer) 
throws Exception {
 
                @Override
                void addBufferConsumer(BufferConsumer bufferConsumer) throws 
Exception {
-                       checkState(bufferConsumer.isFinished(), "Handling of 
non finished buffers is not yet implemented");
                        try {
-                               channel.onBuffer(bufferConsumer.build(), seq++, 
-1);
+                               Buffer buffer = bufferConsumer.build();
+                               checkState(bufferConsumer.isFinished(), 
"Handling of non finished buffers is not yet implemented");
+                               channel.onBuffer(buffer, seq++, -1);
                        }
                        finally {
                                bufferConsumer.close();
                        }
                }
+
+               @Override
+               void flush() {
+               }
        }
 
        // 
------------------------------------------------------------------------
@@ -245,6 +261,7 @@ void addBufferConsumer(BufferConsumer bufferConsumer) 
throws Exception {
                private final int yieldAfter;
 
                ProducerThread(Source[] sources, int numTotal, int maxChunk, 
int yieldAfter) {
+                       super("producer");
                        this.sources = sources;
                        this.numTotal = numTotal;
                        this.maxChunk = maxChunk;
@@ -273,7 +290,10 @@ public void go() throws Exception {
                                        //noinspection CallToThreadYield
                                        Thread.yield();
                                }
+                       }
 
+                       for (Source source : sources) {
+                               source.flush();
                        }
                }
        }
@@ -284,6 +304,7 @@ public void go() throws Exception {
                private final int numBuffers;
 
                ConsumerThread(SingleInputGate gate, int numBuffers) {
+                       super("consumer");
                        this.gate = gate;
                        this.numBuffers = numBuffers;
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index 82a27cc92c0..66918757462 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -58,6 +58,9 @@
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
+/**
+ * Tests verifying fairness in input gates.
+ */
 public class InputGateFairnessTest {
 
        @Test
@@ -115,7 +118,7 @@ public void testFairConsumptionLocalChannelsPreFilled() 
throws Exception {
                                max = Math.max(max, size);
                        }
 
-                       assertTrue(max == min || max == min+1);
+                       assertTrue(max == min || max == (min + 1));
                }
 
                assertFalse(gate.getNextBufferOrEvent().isPresent());
@@ -207,11 +210,11 @@ public void testFairConsumptionRemoteChannelsPreFilled() 
throws Exception {
 
                for (int i = 0; i < numChannels; i++) {
                        RemoteInputChannel channel = new RemoteInputChannel(
-                                       gate, i, new ResultPartitionID(), 
mock(ConnectionID.class), 
+                                       gate, i, new ResultPartitionID(), 
mock(ConnectionID.class),
                                        connManager, 0, 0, 
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
                        channels[i] = channel;
-                       
+
                        for (int p = 0; p < buffersPerChannel; p++) {
                                channel.onBuffer(mockBuffer, p, -1);
                        }
@@ -233,7 +236,7 @@ public void testFairConsumptionRemoteChannelsPreFilled() 
throws Exception {
                                max = Math.max(max, size);
                        }
 
-                       assertTrue(max == min || max == min+1);
+                       assertTrue(max == min || max == (min + 1));
                }
 
                assertFalse(gate.getNextBufferOrEvent().isPresent());
@@ -287,7 +290,7 @@ public void testFairConsumptionRemoteChannels() throws 
Exception {
                                max = Math.max(max, size);
                        }
 
-                       assertTrue(max == min || max == min+1);
+                       assertTrue(max == min || max == (min + 1));
 
                        if (i % (2 * numChannels) == 0) {
                                // add three buffers to each channel, in random 
order
@@ -336,7 +339,7 @@ private void fillRandom(
                        partitions[i].onBuffer(buffer, sequenceNumbers[i]++, 
-1);
                }
        }
-       
+
        // 
------------------------------------------------------------------------
 
        private static class FairnessVerifyingInputGate extends SingleInputGate 
{
@@ -372,7 +375,6 @@ public FairnessVerifyingInputGate(
                        this.uniquenessChecker = new HashSet<>();
                }
 
-
                @Override
                public Optional<BufferOrEvent> getNextBufferOrEvent() throws 
IOException, InterruptedException {
                        synchronized (channelsWithData) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java
index aecab759c78..b83067c11f4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/LegacyPartialConsumePipelinedResultTest.java
@@ -40,14 +40,17 @@
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+/**
+ * Test for consuming a pipelined result only partially.
+ */
 public class LegacyPartialConsumePipelinedResultTest extends TestLogger {
 
        // Test configuration
-       private final static int NUMBER_OF_TMS = 1;
-       private final static int NUMBER_OF_SLOTS_PER_TM = 1;
-       private final static int PARALLELISM = NUMBER_OF_TMS * 
NUMBER_OF_SLOTS_PER_TM;
+       private static final int NUMBER_OF_TMS = 1;
+       private static final int NUMBER_OF_SLOTS_PER_TM = 1;
+       private static final int PARALLELISM = NUMBER_OF_TMS * 
NUMBER_OF_SLOTS_PER_TM;
 
-       private final static int NUMBER_OF_NETWORK_BUFFERS = 128;
+       private static final int NUMBER_OF_NETWORK_BUFFERS = 128;
 
        private static TestingCluster flink;
 
@@ -72,7 +75,7 @@ public static void tearDown() throws Exception {
        /**
         * Tests a fix for FLINK-1930.
         *
-        * <p> When consuming a pipelined result only partially, is is possible 
that local channels
+        * <p>When consuming a pipelined result only partially, is is possible 
that local channels
         * release the buffer pool, which is associated with the result 
partition, too early.  If the
         * producer is still producing data when this happens, it runs into an 
IllegalStateException,
         * because of the destroyed buffer pool.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 2eec34cdae2..f6689fe72d2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -40,6 +40,9 @@
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+/**
+ * Test for consuming a pipelined result only partially.
+ */
 public class PartialConsumePipelinedResultTest extends TestLogger {
 
        // Test configuration
@@ -78,8 +81,8 @@ public static void tearDown() throws Exception {
        /**
         * Tests a fix for FLINK-1930.
         *
-        * <p> When consuming a pipelined result only partially, is is possible 
that local channels
-        * release the buffer pool, which is associated with the result 
partition, too early.  If the
+        * <p>When consuming a pipelined result only partially, is is possible 
that local channels
+        * release the buffer pool, which is associated with the result 
partition, too early. If the
         * producer is still producing data when this happens, it runs into an 
IllegalStateException,
         * because of the destroyed buffer pool.
         *
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index bc66c9d292d..82f61ab493b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -25,7 +25,6 @@
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestProducerSource;
@@ -41,17 +40,12 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
-import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createEventBufferConsumer;
-import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferBuilder;
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
-import static 
org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE;
 import static org.apache.flink.util.FutureUtil.waitForAll;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
@@ -59,10 +53,15 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for {@link PipelinedSubpartition}.
+ *
+ * @see PipelinedSubpartitionWithReadViewTest
+ */
 public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 
-       /** Executor service for concurrent produce/consume tests */
-       private final static ExecutorService executorService = 
Executors.newCachedThreadPool();
+       /** Executor service for concurrent produce/consume tests. */
+       private static final ExecutorService executorService = 
Executors.newCachedThreadPool();
 
        @AfterClass
        public static void shutdownExecutorService() throws Exception {
@@ -76,146 +75,6 @@ PipelinedSubpartition createSubpartition() {
                return new PipelinedSubpartition(0, parent);
        }
 
-       @Test(expected = IllegalStateException.class)
-       public void testAddTwoNonFinishedBuffer() throws Exception {
-               final ResultSubpartition subpartition = createSubpartition();
-               AwaitableBufferAvailablityListener availablityListener = new 
AwaitableBufferAvailablityListener();
-               ResultSubpartitionView readView = 
subpartition.createReadView(availablityListener);
-               availablityListener.resetNotificationCounters();
-
-               try {
-                       
subpartition.add(createBufferBuilder().createBufferConsumer());
-                       
subpartition.add(createBufferBuilder().createBufferConsumer());
-                       assertNull(readView.getNextBuffer());
-               } finally {
-                       subpartition.release();
-               }
-       }
-
-       @Test
-       public void testAddEmptyNonFinishedBuffer() throws Exception {
-               final ResultSubpartition subpartition = createSubpartition();
-               AwaitableBufferAvailablityListener availablityListener = new 
AwaitableBufferAvailablityListener();
-               ResultSubpartitionView readView = 
subpartition.createReadView(availablityListener);
-               availablityListener.resetNotificationCounters();
-
-               try {
-                       assertEquals(0, 
availablityListener.getNumNotifications());
-
-                       BufferBuilder bufferBuilder = createBufferBuilder();
-                       subpartition.add(bufferBuilder.createBufferConsumer());
-
-                       assertEquals(0, 
availablityListener.getNumNotifications());
-                       assertNull(readView.getNextBuffer());
-
-                       bufferBuilder.finish();
-                       bufferBuilder = createBufferBuilder();
-                       subpartition.add(bufferBuilder.createBufferConsumer());
-
-                       assertEquals(1, 
availablityListener.getNumNotifications()); // notification from finishing 
previous buffer.
-                       assertNull(readView.getNextBuffer());
-                       assertEquals(1, subpartition.getBuffersInBacklog());
-               } finally {
-                       readView.releaseAllResources();
-                       subpartition.release();
-               }
-       }
-
-       @Test
-       public void testAddNonEmptyNotFinishedBuffer() throws Exception {
-               final ResultSubpartition subpartition = createSubpartition();
-               AwaitableBufferAvailablityListener availablityListener = new 
AwaitableBufferAvailablityListener();
-               ResultSubpartitionView readView = 
subpartition.createReadView(availablityListener);
-               availablityListener.resetNotificationCounters();
-
-               try {
-                       assertEquals(0, 
availablityListener.getNumNotifications());
-
-                       BufferBuilder bufferBuilder = createBufferBuilder();
-                       
bufferBuilder.appendAndCommit(ByteBuffer.allocate(1024));
-                       subpartition.add(bufferBuilder.createBufferConsumer());
-
-                       // note that since the buffer builder is not finished, 
there is still a retained instance!
-                       assertNextBuffer(readView, 1024, false, 1, false, 
false);
-                       assertEquals(1, subpartition.getBuffersInBacklog());
-               } finally {
-                       readView.releaseAllResources();
-                       subpartition.release();
-               }
-       }
-
-       /**
-        * Normally moreAvailable flag from InputChannel should ignore non 
finished BufferConsumers, otherwise we would
-        * busy loop on the unfinished BufferConsumers.
-        */
-       @Test
-       public void testUnfinishedBufferBehindFinished() throws Exception {
-               final ResultSubpartition subpartition = createSubpartition();
-               AwaitableBufferAvailablityListener availablityListener = new 
AwaitableBufferAvailablityListener();
-               ResultSubpartitionView readView = 
subpartition.createReadView(availablityListener);
-
-               try {
-                       subpartition.add(createFilledBufferConsumer(1025)); // 
finished
-                       
subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // 
not finished
-
-                       assertNextBuffer(readView, 1025, false, 1, false, true);
-               } finally {
-                       subpartition.release();
-               }
-       }
-
-       /**
-        * After flush call unfinished BufferConsumers should be reported as 
available, otherwise we might not flush some
-        * of the data.
-        */
-       @Test
-       public void testFlushWithUnfinishedBufferBehindFinished() throws 
Exception {
-               final ResultSubpartition subpartition = createSubpartition();
-               AwaitableBufferAvailablityListener availablityListener = new 
AwaitableBufferAvailablityListener();
-               ResultSubpartitionView readView = 
subpartition.createReadView(availablityListener);
-
-               try {
-                       subpartition.add(createFilledBufferConsumer(1025)); // 
finished
-                       
subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // 
not finished
-                       subpartition.flush();
-
-                       assertNextBuffer(readView, 1025, true, 1, false, true);
-                       assertNextBuffer(readView, 1024, false, 1, false, 
false);
-               } finally {
-                       subpartition.release();
-               }
-       }
-
-       @Test
-       public void testMultipleEmptyBuffers() throws Exception {
-               final ResultSubpartition subpartition = createSubpartition();
-               AwaitableBufferAvailablityListener availablityListener = new 
AwaitableBufferAvailablityListener();
-               ResultSubpartitionView readView = 
subpartition.createReadView(availablityListener);
-               availablityListener.resetNotificationCounters();
-
-               try {
-                       assertEquals(0, 
availablityListener.getNumNotifications());
-
-                       subpartition.add(createFilledBufferConsumer(0));
-
-                       assertEquals(1, 
availablityListener.getNumNotifications());
-                       subpartition.add(createFilledBufferConsumer(0));
-                       assertEquals(2, 
availablityListener.getNumNotifications());
-
-                       subpartition.add(createFilledBufferConsumer(0));
-                       assertEquals(2, 
availablityListener.getNumNotifications());
-                       assertEquals(3, subpartition.getBuffersInBacklog());
-
-                       subpartition.add(createFilledBufferConsumer(1024));
-                       assertEquals(2, 
availablityListener.getNumNotifications());
-
-                       assertNextBuffer(readView, 1024, false, 0, false, true);
-               } finally {
-                       readView.releaseAllResources();
-                       subpartition.release();
-               }
-       }
-
        @Test
        public void testIllegalReadViewRequest() throws Exception {
                final PipelinedSubpartition subpartition = createSubpartition();
@@ -231,120 +90,23 @@ public void testIllegalReadViewRequest() throws Exception 
{
                }
        }
 
+       /**
+        * Verifies that the isReleased() check of the view checks the parent
+        * subpartition.
+        */
        @Test
-       public void testEmptyFlush() throws Exception {
-               final PipelinedSubpartition subpartition = createSubpartition();
+       public void testIsReleasedChecksParent() {
+               PipelinedSubpartition subpartition = 
mock(PipelinedSubpartition.class);
 
-               AwaitableBufferAvailablityListener listener = new 
AwaitableBufferAvailablityListener();
-               subpartition.createReadView(listener);
-               subpartition.flush();
-               assertEquals(0, listener.getNumNotifications());
-       }
+               PipelinedSubpartitionView reader = new 
PipelinedSubpartitionView(
+                       subpartition, mock(BufferAvailabilityListener.class));
 
-       @Test
-       public void testBasicPipelinedProduceConsumeLogic() throws Exception {
-               final PipelinedSubpartition subpartition = createSubpartition();
+               assertFalse(reader.isReleased());
+               verify(subpartition, times(1)).isReleased();
 
-               BufferAvailabilityListener listener = 
mock(BufferAvailabilityListener.class);
-
-               ResultSubpartitionView view = 
subpartition.createReadView(listener);
-
-               // Empty => should return null
-               assertFalse(view.nextBufferIsEvent());
-               assertNull(view.getNextBuffer());
-               assertFalse(view.nextBufferIsEvent()); // also after 
getNextBuffer()
-               verify(listener, times(0)).notifyDataAvailable();
-
-               // Add data to the queue...
-               subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
-               assertFalse(view.nextBufferIsEvent());
-
-               assertEquals(1, subpartition.getTotalNumberOfBuffers());
-               assertEquals(1, subpartition.getBuffersInBacklog());
-               assertEquals(0, subpartition.getTotalNumberOfBytes()); // only 
updated when getting the buffer
-
-               // ...should have resulted in a notification
-               verify(listener, times(1)).notifyDataAvailable();
-
-               // ...and one available result
-               assertFalse(view.nextBufferIsEvent());
-               BufferAndBacklog read = view.getNextBuffer();
-               assertNotNull(read);
-               assertTrue(read.buffer().isBuffer());
-               assertEquals(BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-               assertEquals(0, subpartition.getBuffersInBacklog());
-               assertEquals(subpartition.getBuffersInBacklog(), 
read.buffersInBacklog());
-               assertFalse(read.nextBufferIsEvent());
-               assertFalse(view.nextBufferIsEvent());
-               assertNull(view.getNextBuffer());
-               assertEquals(0, subpartition.getBuffersInBacklog());
-
-               // Add data to the queue...
-               subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
-               assertFalse(view.nextBufferIsEvent());
-
-               assertEquals(2, subpartition.getTotalNumberOfBuffers());
-               assertEquals(1, subpartition.getBuffersInBacklog());
-               assertEquals(BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-               verify(listener, times(2)).notifyDataAvailable();
-
-               assertFalse(view.nextBufferIsEvent());
-               read = view.getNextBuffer();
-               assertNotNull(read);
-               assertTrue(read.buffer().isBuffer());
-               assertEquals(2 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-               assertEquals(0, subpartition.getBuffersInBacklog());
-               assertEquals(subpartition.getBuffersInBacklog(), 
read.buffersInBacklog());
-               assertFalse(read.nextBufferIsEvent());
-               assertFalse(view.nextBufferIsEvent());
-               assertNull(view.getNextBuffer());
-               assertEquals(0, subpartition.getBuffersInBacklog());
-
-               // some tests with events
-
-               // fill with: buffer, event , and buffer
-               subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
-               assertFalse(view.nextBufferIsEvent());
-               subpartition.add(createEventBufferConsumer(BUFFER_SIZE));
-               assertFalse(view.nextBufferIsEvent());
-               subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
-               assertFalse(view.nextBufferIsEvent());
-
-               assertEquals(5, subpartition.getTotalNumberOfBuffers());
-               assertEquals(2, subpartition.getBuffersInBacklog()); // two 
buffers (events don't count)
-               assertEquals(2 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-               verify(listener, times(4)).notifyDataAvailable();
-
-               assertFalse(view.nextBufferIsEvent()); // the first buffer
-               read = view.getNextBuffer();
-               assertNotNull(read);
-               assertTrue(read.buffer().isBuffer());
-               assertEquals(3 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-               assertEquals(1, subpartition.getBuffersInBacklog());
-               assertEquals(subpartition.getBuffersInBacklog(), 
read.buffersInBacklog());
-               assertTrue(read.nextBufferIsEvent());
-
-               assertTrue(view.nextBufferIsEvent()); // the event
-               read = view.getNextBuffer();
-               assertNotNull(read);
-               assertFalse(read.buffer().isBuffer());
-               assertEquals(4 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-               assertEquals(1, subpartition.getBuffersInBacklog());
-               assertEquals(subpartition.getBuffersInBacklog(), 
read.buffersInBacklog());
-               assertFalse(read.nextBufferIsEvent());
-
-               assertFalse(view.nextBufferIsEvent()); // the remaining buffer
-               read = view.getNextBuffer();
-               assertNotNull(read);
-               assertTrue(read.buffer().isBuffer());
-               assertEquals(5 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-               assertEquals(0, subpartition.getBuffersInBacklog());
-               assertEquals(subpartition.getBuffersInBacklog(), 
read.buffersInBacklog());
-               assertFalse(read.nextBufferIsEvent());
-
-               assertEquals(5, subpartition.getTotalNumberOfBuffers());
-               assertEquals(5 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes());
-               verify(listener, times(4)).notifyDataAvailable();
+               when(subpartition.isReleased()).thenReturn(true);
+               assertTrue(reader.isReleased());
+               verify(subpartition, times(2)).isReleased();
        }
 
        @Test
@@ -367,25 +129,6 @@ public void testConcurrentSlowProduceAndSlowConsume() 
throws Exception {
                testProduceConsume(true, true);
        }
 
-       /**
-        * Verifies that the isReleased() check of the view checks the parent
-        * subpartition.
-        */
-       @Test
-       public void testIsReleasedChecksParent() throws Exception {
-               PipelinedSubpartition subpartition = 
mock(PipelinedSubpartition.class);
-
-               PipelinedSubpartitionView reader = new 
PipelinedSubpartitionView(
-                               subpartition, 
mock(BufferAvailabilityListener.class));
-
-               assertFalse(reader.isReleased());
-               verify(subpartition, times(1)).isReleased();
-
-               when(subpartition.isReleased()).thenReturn(true);
-               assertTrue(reader.isReleased());
-               verify(subpartition, times(2)).isReleased();
-       }
-
        private void testProduceConsume(boolean isSlowProducer, boolean 
isSlowConsumer) throws Exception {
                // Config
                final int producerBufferPoolSize = 8;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
new file mode 100644
index 00000000000..6f9920ebc39
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
+import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createEventBufferConsumer;
+import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferBuilder;
+import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
+import static 
org.apache.flink.runtime.io.network.partition.SubpartitionTestBase.assertNextBuffer;
+import static 
org.apache.flink.runtime.io.network.partition.SubpartitionTestBase.assertNextEvent;
+import static 
org.apache.flink.runtime.io.network.partition.SubpartitionTestBase.assertNoNextBuffer;
+import static 
org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Additional tests for {@link PipelinedSubpartition} which require an 
availability listener and a
+ * read view.
+ *
+ * @see PipelinedSubpartitionTest
+ */
+public class PipelinedSubpartitionWithReadViewTest {
+
+       private PipelinedSubpartition subpartition;
+       private AwaitableBufferAvailablityListener availablityListener;
+       private PipelinedSubpartitionView readView;
+
+       @Before
+       public void setup() throws IOException {
+               final ResultPartition parent = mock(ResultPartition.class);
+               subpartition = new PipelinedSubpartition(0, parent);
+               availablityListener = new AwaitableBufferAvailablityListener();
+               readView = subpartition.createReadView(availablityListener);
+       }
+
+       @After
+       public void tearDown() {
+               readView.releaseAllResources();
+               subpartition.release();
+       }
+
+       @Test(expected = IllegalStateException.class)
+       public void testAddTwoNonFinishedBuffer() {
+               subpartition.add(createBufferBuilder().createBufferConsumer());
+               subpartition.add(createBufferBuilder().createBufferConsumer());
+               assertNull(readView.getNextBuffer());
+       }
+
+       @Test
+       public void testAddEmptyNonFinishedBuffer() {
+               assertEquals(0, availablityListener.getNumNotifications());
+
+               BufferBuilder bufferBuilder = createBufferBuilder();
+               subpartition.add(bufferBuilder.createBufferConsumer());
+
+               assertEquals(0, availablityListener.getNumNotifications());
+               assertNull(readView.getNextBuffer());
+
+               bufferBuilder.finish();
+               bufferBuilder = createBufferBuilder();
+               subpartition.add(bufferBuilder.createBufferConsumer());
+
+               assertEquals(1, availablityListener.getNumNotifications()); // 
notification from finishing previous buffer.
+               assertNull(readView.getNextBuffer());
+               assertEquals(1, subpartition.getBuffersInBacklog());
+       }
+
+       @Test
+       public void testAddNonEmptyNotFinishedBuffer() throws Exception {
+               assertEquals(0, availablityListener.getNumNotifications());
+
+               BufferBuilder bufferBuilder = createBufferBuilder();
+               bufferBuilder.appendAndCommit(ByteBuffer.allocate(1024));
+               subpartition.add(bufferBuilder.createBufferConsumer());
+
+               // note that since the buffer builder is not finished, there is 
still a retained instance!
+               assertNextBuffer(readView, 1024, false, 1, false, false);
+               assertEquals(1, subpartition.getBuffersInBacklog());
+       }
+
+       /**
+        * Normally moreAvailable flag from InputChannel should ignore non 
finished BufferConsumers, otherwise we would
+        * busy loop on the unfinished BufferConsumers.
+        */
+       @Test
+       public void testUnfinishedBufferBehindFinished() throws Exception {
+               subpartition.add(createFilledBufferConsumer(1025)); // finished
+               
subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // 
not finished
+
+               assertThat(availablityListener.getNumNotifications(), 
greaterThan(0L));
+               assertNextBuffer(readView, 1025, false, 1, false, true);
+               // not notified, but we could still access the unfinished buffer
+               assertNextBuffer(readView, 1024, false, 1, false, false);
+               assertNoNextBuffer(readView);
+       }
+
+       /**
+        * After flush call unfinished BufferConsumers should be reported as 
available, otherwise we might not flush some
+        * of the data.
+        */
+       @Test
+       public void testFlushWithUnfinishedBufferBehindFinished() throws 
Exception {
+               subpartition.add(createFilledBufferConsumer(1025)); // finished
+               
subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // 
not finished
+               long oldNumNotifications = 
availablityListener.getNumNotifications();
+               subpartition.flush();
+               // buffer queue is > 1, should already be notified, no further 
notification necessary
+               assertThat(oldNumNotifications, greaterThan(0L));
+               assertEquals(oldNumNotifications, 
availablityListener.getNumNotifications());
+
+               assertNextBuffer(readView, 1025, true, 1, false, true);
+               assertNextBuffer(readView, 1024, false, 1, false, false);
+               assertNoNextBuffer(readView);
+       }
+
+       /**
+        * A flush call with a buffer size of 1 should always notify consumers 
(unless already flushed).
+        */
+       @Test
+       public void testFlushWithUnfinishedBufferBehindFinished2() throws 
Exception {
+               // no buffers -> no notification or any other effects
+               subpartition.flush();
+               assertEquals(0, availablityListener.getNumNotifications());
+
+               subpartition.add(createFilledBufferConsumer(1025)); // finished
+               
subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // 
not finished
+
+               assertNextBuffer(readView, 1025, false, 1, false, true);
+
+               long oldNumNotifications = 
availablityListener.getNumNotifications();
+               subpartition.flush();
+               // buffer queue is 1 again -> need to flush
+               assertEquals(oldNumNotifications + 1, 
availablityListener.getNumNotifications());
+               subpartition.flush();
+               // calling again should not flush again
+               assertEquals(oldNumNotifications + 1, 
availablityListener.getNumNotifications());
+
+               assertNextBuffer(readView, 1024, false, 1, false, false);
+               assertNoNextBuffer(readView);
+       }
+
+       @Test
+       public void testMultipleEmptyBuffers() throws Exception {
+               assertEquals(0, availablityListener.getNumNotifications());
+
+               subpartition.add(createFilledBufferConsumer(0));
+
+               assertEquals(1, availablityListener.getNumNotifications());
+               subpartition.add(createFilledBufferConsumer(0));
+               assertEquals(2, availablityListener.getNumNotifications());
+
+               subpartition.add(createFilledBufferConsumer(0));
+               assertEquals(2, availablityListener.getNumNotifications());
+               assertEquals(3, subpartition.getBuffersInBacklog());
+
+               subpartition.add(createFilledBufferConsumer(1024));
+               assertEquals(2, availablityListener.getNumNotifications());
+
+               assertNextBuffer(readView, 1024, false, 0, false, true);
+       }
+
+       @Test
+       public void testEmptyFlush()  {
+               subpartition.flush();
+               assertEquals(0, availablityListener.getNumNotifications());
+       }
+
+       @Test
+       public void testBasicPipelinedProduceConsumeLogic() throws Exception {
+               // Empty => should return null
+               assertFalse(readView.nextBufferIsEvent());
+               assertNoNextBuffer(readView);
+               assertFalse(readView.nextBufferIsEvent()); // also after 
getNextBuffer()
+               assertEquals(0, availablityListener.getNumNotifications());
+
+               // Add data to the queue...
+               subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
+               assertFalse(readView.nextBufferIsEvent());
+
+               assertEquals(1, subpartition.getTotalNumberOfBuffers());
+               assertEquals(1, subpartition.getBuffersInBacklog());
+               assertEquals(0, subpartition.getTotalNumberOfBytes()); // only 
updated when getting the buffer
+
+               // ...should have resulted in a notification
+               assertEquals(1, availablityListener.getNumNotifications());
+
+               // ...and one available result
+               assertNextBuffer(readView, BUFFER_SIZE, false, 
subpartition.getBuffersInBacklog() - 1, false, true);
+               assertEquals(BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+               assertEquals(0, subpartition.getBuffersInBacklog());
+               assertNoNextBuffer(readView);
+               assertEquals(0, subpartition.getBuffersInBacklog());
+
+               // Add data to the queue...
+               subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
+               assertFalse(readView.nextBufferIsEvent());
+
+               assertEquals(2, subpartition.getTotalNumberOfBuffers());
+               assertEquals(1, subpartition.getBuffersInBacklog());
+               assertEquals(BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+               assertEquals(2, availablityListener.getNumNotifications());
+
+               assertNextBuffer(readView, BUFFER_SIZE, false, 
subpartition.getBuffersInBacklog() - 1, false, true);
+               assertEquals(2 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+               assertEquals(0, subpartition.getBuffersInBacklog());
+               assertNoNextBuffer(readView);
+               assertEquals(0, subpartition.getBuffersInBacklog());
+
+               // some tests with events
+
+               // fill with: buffer, event, and buffer
+               subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
+               assertFalse(readView.nextBufferIsEvent());
+               subpartition.add(createEventBufferConsumer(BUFFER_SIZE));
+               assertFalse(readView.nextBufferIsEvent());
+               subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
+               assertFalse(readView.nextBufferIsEvent());
+
+               assertEquals(5, subpartition.getTotalNumberOfBuffers());
+               assertEquals(2, subpartition.getBuffersInBacklog()); // two 
buffers (events don't count)
+               assertEquals(2 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+               assertEquals(4, availablityListener.getNumNotifications());
+
+               // the first buffer
+               assertNextBuffer(readView, BUFFER_SIZE, true, 
subpartition.getBuffersInBacklog() - 1, true, true);
+               assertEquals(3 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+               assertEquals(1, subpartition.getBuffersInBacklog());
+
+               // the event
+               assertNextEvent(readView, BUFFER_SIZE, null, true, 
subpartition.getBuffersInBacklog(), false, true);
+               assertEquals(4 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+               assertEquals(1, subpartition.getBuffersInBacklog());
+
+               // the remaining buffer
+               assertNextBuffer(readView, BUFFER_SIZE, false, 
subpartition.getBuffersInBacklog() - 1, false, true);
+               assertEquals(5 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+               assertEquals(0, subpartition.getBuffersInBacklog());
+
+               // nothing more
+               assertNoNextBuffer(readView);
+               assertEquals(0, subpartition.getBuffersInBacklog());
+
+               assertEquals(5, subpartition.getTotalNumberOfBuffers());
+               assertEquals(5 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes());
+               assertEquals(4, availablityListener.getNumNotifications());
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
index 6bff0f618c0..d182f119360 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
@@ -26,6 +26,9 @@
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for {@link ProducerFailedException}.
+ */
 public class ProducerFailedExceptionTest {
 
        @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index 817795ce7ea..57d2cd648ce 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -228,24 +228,20 @@ public void testConsumeSpilledPartition() throws 
Exception {
                SpilledSubpartitionView reader = (SpilledSubpartitionView) 
partition.createReadView(listener);
 
                assertEquals(1, listener.getNumNotifications());
-
                assertFalse(reader.nextBufferIsEvent()); // buffer
+
                assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, 
true);
                assertEquals(2, partition.getBuffersInBacklog());
 
-               assertFalse(reader.nextBufferIsEvent()); // buffer
                assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true);
                assertEquals(1, partition.getBuffersInBacklog());
 
-               assertTrue(reader.nextBufferIsEvent()); // event
                assertNextEvent(reader, eventSize, 
CancelCheckpointMarker.class, true, 1, false, true);
                assertEquals(1, partition.getBuffersInBacklog());
 
-               assertFalse(reader.nextBufferIsEvent()); // buffer
                assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true);
                assertEquals(0, partition.getBuffersInBacklog());
 
-               assertTrue(reader.nextBufferIsEvent()); // end of partition 
event
                assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, 
false, true);
                assertEquals(0, partition.getBuffersInBacklog());
 
@@ -314,24 +310,20 @@ public void testConsumeSpilledPartitionSpilledBeforeAdd() 
throws Exception {
                SpilledSubpartitionView reader = (SpilledSubpartitionView) 
partition.createReadView(listener);
 
                assertEquals(1, listener.getNumNotifications());
-
                assertFalse(reader.nextBufferIsEvent()); // full buffer
+
                assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, 
true);
                assertEquals(2, partition.getBuffersInBacklog());
 
-               assertFalse(reader.nextBufferIsEvent()); // full buffer
                assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true);
                assertEquals(1, partition.getBuffersInBacklog());
 
-               assertTrue(reader.nextBufferIsEvent()); // event
                assertNextEvent(reader, eventSize, 
CancelCheckpointMarker.class, true, 1, false, true);
                assertEquals(1, partition.getBuffersInBacklog());
 
-               assertFalse(reader.nextBufferIsEvent()); // partial buffer
                assertNextBuffer(reader, BUFFER_DATA_SIZE / 2, true, 0, true, 
true);
                assertEquals(0, partition.getBuffersInBacklog());
 
-               assertTrue(reader.nextBufferIsEvent()); // end of partition 
event
                assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, 
false, true);
                assertEquals(0, partition.getBuffersInBacklog());
 
@@ -370,6 +362,7 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
                assertFalse(bufferConsumer.isRecycled());
 
                assertFalse(reader.nextBufferIsEvent());
+
                // first buffer (non-spilled)
                assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, 
false);
                assertEquals(BUFFER_DATA_SIZE, 
partition.getTotalNumberOfBytes()); // only updated when getting/spilling the 
buffers
@@ -397,19 +390,19 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
                Buffer buffer = bufferConsumer.build();
                buffer.retainBuffer();
 
-               assertFalse(reader.nextBufferIsEvent()); // second buffer 
(retained in SpillableSubpartition#nextBuffer)
+               // second buffer (retained in SpillableSubpartition#nextBuffer)
                assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, 
false);
                assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, 
partition.getTotalNumberOfBytes()); // finally integrates the nextBuffer 
statistics
                assertEquals(1, partition.getBuffersInBacklog());
 
                bufferConsumer.close(); // recycle the retained buffer from 
above (should be the last reference!)
 
-               assertTrue(reader.nextBufferIsEvent()); // the event (spilled)
+               // the event (spilled)
                assertNextEvent(reader, eventSize, 
CancelCheckpointMarker.class, true, 1, false, true);
                assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, 
partition.getTotalNumberOfBytes()); // already updated during spilling
                assertEquals(1, partition.getBuffersInBacklog());
 
-               assertFalse(reader.nextBufferIsEvent()); // last buffer 
(spilled)
+               // last buffer (spilled)
                assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true);
                assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, 
partition.getTotalNumberOfBytes()); // already updated during spilling
                assertEquals(0, partition.getBuffersInBacklog());
@@ -418,7 +411,6 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
                assertTrue(buffer.isRecycled());
 
                // End of partition
-               assertTrue(reader.nextBufferIsEvent());
                assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, 
false, true);
                assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, 
partition.getTotalNumberOfBytes()); // already updated during spilling
                assertEquals(0, partition.getBuffersInBacklog());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index 8c902157da9..9f5e6d0080f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -207,6 +207,8 @@ private static void assertNextBufferOrEvent(
                        assertEquals("backlog", expectedBuffersInBacklog, 
bufferAndBacklog.buffersInBacklog());
                        assertEquals("next is event", expectedNextBufferIsEvent,
                                bufferAndBacklog.nextBufferIsEvent());
+                       assertEquals("next is event", expectedNextBufferIsEvent,
+                               readView.nextBufferIsEvent());
 
                        assertFalse("not recycled", 
bufferAndBacklog.buffer().isRecycled());
                } finally {
@@ -215,7 +217,7 @@ private static void assertNextBufferOrEvent(
                assertEquals("recycled", expectedRecycledAfterRecycle, 
bufferAndBacklog.buffer().isRecycled());
        }
 
-       protected void assertNoNextBuffer(ResultSubpartitionView readView) 
throws IOException, InterruptedException {
+       static void assertNoNextBuffer(ResultSubpartitionView readView) throws 
IOException, InterruptedException {
                assertNull(readView.getNextBuffer());
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
index d757aa9c0f5..2f5a013c56a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -21,6 +21,7 @@
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
 import org.junit.Test;
 
 import java.io.IOException;
@@ -31,6 +32,9 @@
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
+/**
+ * Tests for {@link InputChannel}.
+ */
 public class InputChannelTest {
 
        @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
index a91473327d0..a67df0b41dd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
@@ -36,6 +36,11 @@
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
 
+/**
+ * Input gate helper for unit tests.
+ *
+ * @param <T> type of the value to handle
+ */
 public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> 
extends TestSingleInputGate {
 
        private final TestInputChannel inputChannel = new 
TestInputChannel(inputGate, 0);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 1ecb67ff82c..2afd6d4aff9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -75,12 +75,15 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for the {@link LocalInputChannel}.
+ */
 public class LocalInputChannelTest {
 
        /**
         * Tests the consumption of multiple subpartitions via local input 
channels.
         *
-        * <p> Multiple producer tasks produce pipelined partitions, which are 
consumed by multiple
+        * <p>Multiple producer tasks produce pipelined partitions, which are 
consumed by multiple
         * tasks via local input channels.
         */
        @Test
@@ -266,20 +269,22 @@ public void testProducerFailedException() throws 
Exception {
         * Verifies that concurrent release via the SingleInputGate and 
re-triggering
         * of a partition request works smoothly.
         *
-        * - SingleInputGate acquires its request lock and tries to release all
+        * <ul>
+        * <li>SingleInputGate acquires its request lock and tries to release 
all
         * registered channels. When releasing a channel, it needs to acquire
-        * the channel's shared request-release lock.
-        * - If a LocalInputChannel concurrently retriggers a partition request 
via
+        * the channel's shared request-release lock.</li>
+        * <li>If a LocalInputChannel concurrently retriggers a partition 
request via
         * a Timer Thread it acquires the channel's request-release lock and 
calls
         * the retrigger callback on the SingleInputGate, which again tries to
-        * acquire the gate's request lock.
+        * acquire the gate's request lock.</li>
+        * </ul>
         *
-        * For certain timings this obviously leads to a deadlock. This test 
reliably
+        * <p>For certain timings this obviously leads to a deadlock. This test 
reliably
         * reproduced such a timing (reported in FLINK-5228). This test is 
pretty much
         * testing the buggy implementation and has not much more general 
value. If it
         * becomes obsolete at some point (future greatness ;)), feel free to 
remove it.
         *
-        * The fix in the end was to to not acquire the channels lock when 
releasing it
+        * <p>The fix in the end was to to not acquire the channels lock when 
releasing it
         * and/or not doing any input gate callbacks while holding the 
channel's lock.
         * I decided to do both.
         */
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 9141b36d445..ec80459f0ea 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -336,9 +336,11 @@ public void testProducerFailedException() throws Exception 
{
         * Tests to verify the behaviours of three different processes if the 
number of available
         * buffers is less than required buffers.
         *
-        * 1. Recycle the floating buffer
-        * 2. Recycle the exclusive buffer
-        * 3. Decrease the sender's backlog
+        * <ol>
+        * <li>Recycle the floating buffer</li>
+        * <li>Recycle the exclusive buffer</li>
+        * <li>Decrease the sender's backlog</li>
+        * </ol>
         */
        @Test
        public void testAvailableBuffersLessThanRequiredBuffers() throws 
Exception {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 71203276dd9..4bf5b220be8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -164,7 +164,7 @@ public void testBackwardsEventWithUninitializedChannel() 
throws Exception {
 
                final ResultSubpartitionView iterator = 
mock(ResultSubpartitionView.class);
                when(iterator.getNextBuffer()).thenReturn(
-                       new BufferAndBacklog(new 
NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(1024), 
FreeingBufferRecycler.INSTANCE), false,0, false));
+                       new BufferAndBacklog(new 
NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(1024), 
FreeingBufferRecycler.INSTANCE), false, 0, false));
 
                final ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
                when(partitionManager.createSubpartitionView(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index b0bafd505ec..33f570917b0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -24,6 +24,7 @@
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.TaskActions;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 2e012253d94..96f01fd5667 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -32,13 +32,16 @@
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
+/**
+ * Tests for {@link UnionInputGate}.
+ */
 public class UnionInputGateTest {
 
        /**
         * Tests basic correctness of buffer-or-event interleaving and correct 
<code>null</code> return
         * value after receiving all end-of-partition events.
         *
-        * <p> For buffer-or-event instances, it is important to verify that 
they have been set off to
+        * <p>For buffer-or-event instances, it is important to verify that 
they have been set off to
         * the correct logical index.
         */
        @Test(timeout = 120 * 1000)
diff --git a/tools/maven/suppressions-runtime.xml 
b/tools/maven/suppressions-runtime.xml
index 33a92e3075f..51793e510ce 100644
--- a/tools/maven/suppressions-runtime.xml
+++ b/tools/maven/suppressions-runtime.xml
@@ -87,11 +87,11 @@ under the License.
                
files="(.*)test[/\\](.*)runtime[/\\]io[/\\](async|disk)[/\\](.*)"
                checks="AvoidStarImport|UnusedImports"/>
        <suppress
-               
files="(.*)runtime[/\\]io[/\\]network[/\\](buffer|netty|partition|serialization|util)[/\\](.*)"
+               
files="(.*)runtime[/\\]io[/\\]network[/\\](buffer|netty|serialization|util)[/\\](.*)"
                
checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
        <!--Only additional checks for test sources. Those checks were present 
in the "pre-strict" checkstyle but were not applied to test sources. We do not 
want to suppress them for sources directory-->
        <suppress
-               
files="(.*)test[/\\](.*)runtime[/\\]io[/\\]network[/\\](buffer|netty|partition|serialization|util)[/\\](.*)"
+               
files="(.*)test[/\\](.*)runtime[/\\]io[/\\]network[/\\](buffer|netty|serialization|util)[/\\](.*)"
                checks="AvoidStarImport|UnusedImports"/>
        <!--Test class copied from the netty project-->
        <suppress


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to