[FLINK-8591][runtime] Pass unfinished bufferConsumers to subpartitions

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b1e127f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b1e127f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b1e127f

Branch: refs/heads/master
Commit: 5b1e127f7b3acd8f82893dda394fbcb7fe93d20d
Parents: 98bd689
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Wed Jan 24 14:43:23 2018 +0100
Committer: Piotr Nowojski <piotr.nowoj...@gmail.com>
Committed: Mon Feb 19 12:21:41 2018 +0100

----------------------------------------------------------------------
 .../serialization/SpanningRecordSerializer.java |   3 -
 .../io/network/api/writer/RecordWriter.java     |  96 +++++++--------
 .../api/writer/ResultPartitionWriter.java       |   8 ++
 .../CreditBasedSequenceNumberingViewReader.java |  38 +++---
 .../io/network/netty/PartitionRequestQueue.java |  79 ++++++------
 .../netty/SequenceNumberingViewReader.java      |  31 ++---
 .../partition/BufferAvailabilityListener.java   |   6 +-
 .../partition/PipelinedSubpartition.java        |  84 ++++++++++---
 .../partition/PipelinedSubpartitionView.java    |   4 +-
 .../io/network/partition/ResultPartition.java   |   7 ++
 .../network/partition/ResultSubpartition.java   |  14 ++-
 .../partition/ResultSubpartitionView.java       |   2 +-
 .../partition/SpillableSubpartition.java        |  11 ++
 .../partition/SpillableSubpartitionView.java    |  14 ++-
 .../partition/SpilledSubpartitionView.java      |   8 +-
 .../partition/consumer/InputChannel.java        |   5 +-
 .../partition/consumer/LocalInputChannel.java   |  42 +++----
 .../partition/consumer/RemoteInputChannel.java  |   5 +-
 .../partition/consumer/SingleInputGate.java     |  44 +++----
 .../partition/consumer/UnionInputGate.java      |  21 ++++
 .../partition/consumer/UnknownInputChannel.java |   3 +-
 .../operators/shipping/OutputCollector.java     |  17 ++-
 ...AbstractCollectingResultPartitionWriter.java |  20 ++-
 .../io/network/api/writer/RecordWriterTest.java |  12 +-
 .../network/buffer/BufferBuilderTestUtils.java  |   4 +
 .../netty/CancelPartitionRequestTest.java       |   9 +-
 .../netty/PartitionRequestQueueTest.java        |  90 ++++++++++----
 .../netty/ServerTransportErrorHandlingTest.java |   2 +-
 .../AwaitableBufferAvailablityListener.java     |  47 +++++++
 .../NoOpBufferAvailablityListener.java          |  28 +++++
 .../PartialConsumePipelinedResultTest.java      |   2 +-
 .../partition/PipelinedSubpartitionTest.java    | 123 ++++++++++++++++---
 .../partition/SpillableSubpartitionTest.java    |  47 ++-----
 .../network/partition/SubpartitionTestBase.java |  13 ++
 .../partition/consumer/InputChannelTest.java    |   5 +-
 .../IteratorWrappingTestSingleInputGate.java    |  10 +-
 .../consumer/LocalInputChannelTest.java         |  16 +--
 .../partition/consumer/SingleInputGateTest.java |   2 +-
 .../partition/consumer/TestInputChannel.java    |  14 ++-
 .../network/util/TestSubpartitionConsumer.java  |  27 ++--
 .../flink/streaming/api/graph/StreamConfig.java |   4 +
 .../runtime/io/RecordWriterOutput.java          |   4 -
 .../runtime/io/StreamRecordWriter.java          |  11 +-
 .../streaming/runtime/tasks/OperatorChain.java  |  13 +-
 .../consumer/StreamTestSingleInputGate.java     |  16 ++-
 .../runtime/io/StreamRecordWriterTest.java      | 113 -----------------
 46 files changed, 672 insertions(+), 502 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index e1d7fb1..ba8e659 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -148,9 +148,6 @@ public class SpanningRecordSerializer<T extends 
IOReadableWritable> implements R
 
        @Override
        public void clear() {
-               if (targetBuffer != null) {
-                       targetBuffer.finish();
-               }
                targetBuffer = null;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 801e6eb..51dfbde 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -61,7 +61,7 @@ public class RecordWriter<T extends IOReadableWritable> {
        /** {@link RecordSerializer} per outgoing channel. */
        private final RecordSerializer<T>[] serializers;
 
-       private final Optional<BufferConsumer>[] bufferConsumers;
+       private final Optional<BufferBuilder>[] bufferBuilders;
 
        private final Random rng = new XORShiftRandom();
 
@@ -84,10 +84,10 @@ public class RecordWriter<T extends IOReadableWritable> {
                 * serializer.
                 */
                this.serializers = new SpanningRecordSerializer[numChannels];
-               this.bufferConsumers = new Optional[numChannels];
+               this.bufferBuilders = new Optional[numChannels];
                for (int i = 0; i < numChannels; i++) {
                        serializers[i] = new SpanningRecordSerializer<T>();
-                       bufferConsumers[i] = Optional.empty();
+                       bufferBuilders[i] = Optional.empty();
                }
        }
 
@@ -117,28 +117,24 @@ public class RecordWriter<T extends IOReadableWritable> {
        private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
                RecordSerializer<T> serializer = serializers[targetChannel];
 
-               synchronized (serializer) {
-                       SerializationResult result = 
serializer.addRecord(record);
-
-                       while (result.isFullBuffer()) {
-                               if (tryWriteAndClearBuffer(targetChannel, 
serializer)) {
-                                       // If this was a full record, we are 
done. Not breaking
-                                       // out of the loop at this point will 
lead to another
-                                       // buffer request before breaking out 
(that would not be
-                                       // a problem per se, but it can lead to 
stalls in the
-                                       // pipeline).
-                                       if (result.isFullRecord()) {
-                                               break;
-                                       }
+               SerializationResult result = serializer.addRecord(record);
+
+               while (result.isFullBuffer()) {
+                       if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
+                               // If this was a full record, we are done. Not 
breaking
+                               // out of the loop at this point will lead to 
another
+                               // buffer request before breaking out (that 
would not be
+                               // a problem per se, but it can lead to stalls 
in the
+                               // pipeline).
+                               if (result.isFullRecord()) {
+                                       break;
                                }
-                               BufferBuilder bufferBuilder =
-                                       
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
-                               
checkState(!bufferConsumers[targetChannel].isPresent());
-                               bufferConsumers[targetChannel] = 
Optional.of(bufferBuilder.createBufferConsumer());
-                               result = 
serializer.setNextBufferBuilder(bufferBuilder);
                        }
-                       checkState(!serializer.hasSerializedData(), "All data 
should be written at once");
+                       BufferBuilder bufferBuilder = 
requestNewBufferBuilder(targetChannel);
+
+                       result = serializer.setNextBufferBuilder(bufferBuilder);
                }
+               checkState(!serializer.hasSerializedData(), "All data should be 
written at once");
        }
 
        public BufferConsumer broadcastEvent(AbstractEvent event) throws 
IOException, InterruptedException {
@@ -146,34 +142,24 @@ public class RecordWriter<T extends IOReadableWritable> {
                        for (int targetChannel = 0; targetChannel < 
numChannels; targetChannel++) {
                                RecordSerializer<T> serializer = 
serializers[targetChannel];
 
-                               synchronized (serializer) {
-                                       tryWriteAndClearBuffer(targetChannel, 
serializer);
+                               tryFinishCurrentBufferBuilder(targetChannel, 
serializer);
 
-                                       // retain the buffer so that it can be 
recycled by each channel of targetPartition
-                                       
targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel);
-                               }
+                               // retain the buffer so that it can be recycled 
by each channel of targetPartition
+                               
targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel);
                        }
                        return eventBufferConsumer;
                }
        }
 
-       public void flush() throws IOException {
-               for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-                       RecordSerializer<T> serializer = 
serializers[targetChannel];
-
-                       synchronized (serializer) {
-                               tryWriteAndClearBuffer(targetChannel, 
serializer);
-                       }
-               }
+       public void flush() {
+               targetPartition.flush();
        }
 
        public void clearBuffers() {
                for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
                        RecordSerializer<?> serializer = 
serializers[targetChannel];
-                       synchronized (serializer) {
-                               closeBufferConsumer(targetChannel);
-                               serializer.clear();
-                       }
+                       closeBufferConsumer(targetChannel);
+                       serializer.clear();
                }
        }
 
@@ -185,33 +171,35 @@ public class RecordWriter<T extends IOReadableWritable> {
        }
 
        /**
-        * Tries to consume serialized data and (if data present) writes them 
to the {@link ResultPartitionWriter}.
-        * After writing it clean ups the state.
-        *
-        * <p><b>Needs to be synchronized on the serializer!</b>
+        * Marks the current {@link BufferBuilder} as finished and clears the 
state for next one.
         *
         * @return true if some data were written
         */
-       private boolean tryWriteAndClearBuffer(
-                       int targetChannel,
-                       RecordSerializer<T> serializer) throws IOException {
+       private boolean tryFinishCurrentBufferBuilder(int targetChannel, 
RecordSerializer<T> serializer) {
 
-               if (!bufferConsumers[targetChannel].isPresent()) {
+               if (!bufferBuilders[targetChannel].isPresent()) {
                        return false;
                }
-               BufferConsumer bufferConsumer = 
bufferConsumers[targetChannel].get();
-               bufferConsumers[targetChannel] = Optional.empty();
+               BufferBuilder bufferBuilder = 
bufferBuilders[targetChannel].get();
+               bufferBuilders[targetChannel] = Optional.empty();
 
-               numBytesOut.inc(bufferConsumer.getWrittenBytes());
+               numBytesOut.inc(bufferBuilder.finish());
                serializer.clear();
-               targetPartition.addBufferConsumer(bufferConsumer, 
targetChannel);
                return true;
        }
 
+       private BufferBuilder requestNewBufferBuilder(int targetChannel) throws 
IOException, InterruptedException {
+               checkState(!bufferBuilders[targetChannel].isPresent());
+               BufferBuilder bufferBuilder = 
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
+               bufferBuilders[targetChannel] = Optional.of(bufferBuilder);
+               
targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), 
targetChannel);
+               return bufferBuilder;
+       }
+
        private void closeBufferConsumer(int targetChannel) {
-               if (bufferConsumers[targetChannel].isPresent()) {
-                       bufferConsumers[targetChannel].get().close();
-                       bufferConsumers[targetChannel] = Optional.empty();
+               if (bufferBuilders[targetChannel].isPresent()) {
+                       bufferBuilders[targetChannel].get().finish();
+                       bufferBuilders[targetChannel] = Optional.empty();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index caefb52..02049d9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -45,6 +45,14 @@ public interface ResultPartitionWriter {
         *
         * <p>This method takes the ownership of the passed {@code 
bufferConsumer} and thus is responsible for releasing
         * it's resources.
+        *
+        * <p>To avoid problems with data re-ordering, before adding new {@link 
BufferConsumer} the previously added one
+        * the given {@code subpartitionIndex} must be marked as {@link 
BufferConsumer#isFinished()}.
         */
        void addBufferConsumer(BufferConsumer bufferConsumer, int 
subpartitionIndex) throws IOException;
+
+       /**
+        * Manually trigger consumption from enqueued {@link BufferConsumer 
BufferConsumers}.
+        */
+       void flush();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
index 5ebf62d..d02b2bf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
@@ -18,19 +18,19 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
-import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
-import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
 import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Simple wrapper for the subpartition view used in the new network 
credit-based mode.
@@ -44,7 +44,7 @@ class CreditBasedSequenceNumberingViewReader implements 
BufferAvailabilityListen
 
        private final InputChannelID receiverId;
 
-       private final AtomicLong numBuffersAvailable = new AtomicLong();
+       private final AtomicBoolean buffersAvailable = new AtomicBoolean();
 
        private final PartitionRequestQueue requestQueue;
 
@@ -118,7 +118,7 @@ class CreditBasedSequenceNumberingViewReader implements 
BufferAvailabilityListen
        @Override
        public boolean isAvailable() {
                // BEWARE: this must be in sync with #isAvailable()!
-               return numBuffersAvailable.get() > 0 &&
+               return buffersAvailable.get() &&
                        (numCreditsAvailable > 0 || 
subpartitionView.nextBufferIsEvent());
        }
 
@@ -131,11 +131,9 @@ class CreditBasedSequenceNumberingViewReader implements 
BufferAvailabilityListen
         *
         * @param bufferAndBacklog
         *              current buffer and backlog including information about 
the next buffer
-        * @param remaining
-        *              remaining number of queued buffers, i.e. 
<tt>numBuffersAvailable.get()</tt>
         */
-       private boolean isAvailable(BufferAndBacklog bufferAndBacklog, long 
remaining) {
-               return remaining > 0 &&
+       private boolean isAvailable(BufferAndBacklog bufferAndBacklog) {
+               return bufferAndBacklog.isMoreAvailable() &&
                        (numCreditsAvailable > 0 || 
bufferAndBacklog.nextBufferIsEvent());
        }
 
@@ -155,27 +153,23 @@ class CreditBasedSequenceNumberingViewReader implements 
BufferAvailabilityListen
        }
 
        @VisibleForTesting
-       long getNumBuffersAvailable() {
-               return numBuffersAvailable.get();
+       boolean hasBuffersAvailable() {
+               return buffersAvailable.get();
        }
 
        @Override
        public BufferAndAvailability getNextBuffer() throws IOException, 
InterruptedException {
                BufferAndBacklog next = subpartitionView.getNextBuffer();
                if (next != null) {
-                       long remaining = numBuffersAvailable.decrementAndGet();
+                       buffersAvailable.set(next.isMoreAvailable());
                        sequenceNumber++;
 
-                       if (remaining < 0) {
-                               throw new IllegalStateException("no buffer 
available");
-                       }
-
                        if (next.buffer().isBuffer() && --numCreditsAvailable < 
0) {
                                throw new IllegalStateException("no credit 
available");
                        }
 
                        return new BufferAndAvailability(
-                               next.buffer(), isAvailable(next, remaining), 
next.buffersInBacklog());
+                               next.buffer(), isAvailable(next), 
next.buffersInBacklog());
                } else {
                        return null;
                }
@@ -202,11 +196,9 @@ class CreditBasedSequenceNumberingViewReader implements 
BufferAvailabilityListen
        }
 
        @Override
-       public void notifyBuffersAvailable(long numBuffers) {
-               // if this request made the channel non-empty, notify the input 
gate
-               if (numBuffers > 0 && numBuffersAvailable.getAndAdd(numBuffers) 
== 0) {
-                       requestQueue.notifyReaderNonEmpty(this);
-               }
+       public void notifyDataAvailable() {
+               buffersAvailable.set(true);
+               requestQueue.notifyReaderNonEmpty(this);
        }
 
        @Override
@@ -214,7 +206,7 @@ class CreditBasedSequenceNumberingViewReader implements 
BufferAvailabilityListen
                return "CreditBasedSequenceNumberingViewReader{" +
                        "requestLock=" + requestLock +
                        ", receiverId=" + receiverId +
-                       ", numBuffersAvailable=" + numBuffersAvailable.get() +
+                       ", buffersAvailable=" + buffersAvailable.get() +
                        ", sequenceNumber=" + sequenceNumber +
                        ", numCreditsAvailable=" + numCreditsAvailable +
                        ", isRegisteredAsAvailable=" + isRegisteredAsAvailable +

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index 4832442..8d43815 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -103,18 +103,17 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
         * availability, so there is no race condition here.
         */
        private void enqueueAvailableReader(final NetworkSequenceViewReader 
reader) throws Exception {
-               if (!reader.isRegisteredAsAvailable() && reader.isAvailable()) {
-                       // Queue an available reader for consumption. If the 
queue is empty,
-                       // we try trigger the actual write. Otherwise this will 
be handled by
-                       // the writeAndFlushNextMessageIfPossible calls.
-                       boolean triggerWrite = availableReaders.isEmpty();
-                       availableReaders.add(reader);
-
-                       reader.setRegisteredAsAvailable(true);
-
-                       if (triggerWrite) {
-                               
writeAndFlushNextMessageIfPossible(ctx.channel());
-                       }
+               if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) {
+                       return;
+               }
+               // Queue an available reader for consumption. If the queue is 
empty,
+               // we try trigger the actual write. Otherwise this will be 
handled by
+               // the writeAndFlushNextMessageIfPossible calls.
+               boolean triggerWrite = availableReaders.isEmpty();
+               registerAvailableReader(reader);
+
+               if (triggerWrite) {
+                       writeAndFlushNextMessageIfPossible(ctx.channel());
                }
        }
 
@@ -183,13 +182,12 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
                        // Cancel the request for the input channel
                        int size = availableReaders.size();
                        for (int i = 0; i < size; i++) {
-                               NetworkSequenceViewReader reader = 
availableReaders.poll();
+                               NetworkSequenceViewReader reader = 
pollAvailableReader();
                                if (reader.getReceiverId().equals(toCancel)) {
                                        reader.releaseAllResources();
-                                       reader.setRegisteredAsAvailable(false);
                                        markAsReleased(reader.getReceiverId());
                                } else {
-                                       availableReaders.add(reader);
+                                       registerAvailableReader(reader);
                                }
                        }
 
@@ -216,7 +214,7 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
                BufferAndAvailability next = null;
                try {
                        while (true) {
-                               NetworkSequenceViewReader reader = 
availableReaders.poll();
+                               NetworkSequenceViewReader reader = 
pollAvailableReader();
 
                                // No queue with available data. We allow this 
here, because
                                // of the write callbacks that are executed 
after each write.
@@ -226,32 +224,24 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
 
                                next = reader.getNextBuffer();
                                if (next == null) {
-                                       if (reader.isReleased()) {
-                                               
markAsReleased(reader.getReceiverId());
+                                       if (!reader.isReleased()) {
+                                               continue;
+                                       }
+                                       markAsReleased(reader.getReceiverId());
 
-                                               Throwable cause = 
reader.getFailureCause();
-                                               if (cause != null) {
-                                                       ErrorResponse msg = new 
ErrorResponse(
-                                                               new 
ProducerFailedException(cause),
-                                                               
reader.getReceiverId());
-
-                                                       ctx.writeAndFlush(msg);
-                                               }
-                                       } else {
-                                               IllegalStateException err = new 
IllegalStateException(
-                                                       "Bug in Netty consumer 
logic: reader queue got notified by partition " +
-                                                               "about 
available data, but none was available.");
-                                               handleException(ctx.channel(), 
err);
-                                               return;
+                                       Throwable cause = 
reader.getFailureCause();
+                                       if (cause != null) {
+                                               ErrorResponse msg = new 
ErrorResponse(
+                                                       new 
ProducerFailedException(cause),
+                                                       reader.getReceiverId());
+
+                                               ctx.writeAndFlush(msg);
                                        }
                                } else {
                                        // This channel was now removed from 
the available reader queue.
-                                       // We re-add it into the queue if it is 
still available, otherwise we will
-                                       // notify the reader about the changed 
channel availability registration.
+                                       // We re-add it into the queue if it is 
still available
                                        if (next.moreAvailable()) {
-                                               availableReaders.add(reader);
-                                       } else {
-                                               
reader.setRegisteredAsAvailable(false);
+                                               registerAvailableReader(reader);
                                        }
 
                                        BufferResponse msg = new BufferResponse(
@@ -283,6 +273,19 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
                }
        }
 
+       private void registerAvailableReader(NetworkSequenceViewReader reader) {
+               availableReaders.add(reader);
+               reader.setRegisteredAsAvailable(true);
+       }
+
+       private NetworkSequenceViewReader pollAvailableReader() {
+               NetworkSequenceViewReader reader = availableReaders.poll();
+               if (reader != null) {
+                       reader.setRegisteredAsAvailable(false);
+               }
+               return reader;
+       }
+
        private boolean isEndOfPartitionEvent(Buffer buffer) throws IOException 
{
                return EventSerializer.isEvent(buffer, 
EndOfPartitionEvent.class,
                        getClass().getClassLoader());
@@ -301,7 +304,7 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
        }
 
        private void handleException(Channel channel, Throwable cause) throws 
IOException {
-               LOG.debug("Encountered error while consuming partitions", 
cause);
+               LOG.error("Encountered error while consuming partitions", 
cause);
 
                fatalError = true;
                releaseAllResources();

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
index 0ec5fcb..2d9635c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
@@ -19,17 +19,17 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
-import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Simple wrapper for the subpartition view used in the old network mode.
@@ -43,7 +43,7 @@ class SequenceNumberingViewReader implements 
BufferAvailabilityListener, Network
 
        private final InputChannelID receiverId;
 
-       private final AtomicLong numBuffersAvailable = new AtomicLong();
+       private final AtomicBoolean buffersAvailable = new AtomicBoolean();
 
        private final PartitionRequestQueue requestQueue;
 
@@ -51,6 +51,8 @@ class SequenceNumberingViewReader implements 
BufferAvailabilityListener, Network
 
        private int sequenceNumber = -1;
 
+       private boolean isRegisteredAvailable;
+
        SequenceNumberingViewReader(InputChannelID receiverId, 
PartitionRequestQueue requestQueue) {
                this.receiverId = receiverId;
                this.requestQueue = requestQueue;
@@ -84,16 +86,17 @@ class SequenceNumberingViewReader implements 
BufferAvailabilityListener, Network
 
        @Override
        public void setRegisteredAsAvailable(boolean isRegisteredAvailable) {
+               this.isRegisteredAvailable = isRegisteredAvailable;
        }
 
        @Override
        public boolean isRegisteredAsAvailable() {
-               return false;
+               return isRegisteredAvailable;
        }
 
        @Override
        public boolean isAvailable() {
-               return true;
+               return buffersAvailable.get();
        }
 
        @Override
@@ -110,14 +113,9 @@ class SequenceNumberingViewReader implements 
BufferAvailabilityListener, Network
        public BufferAndAvailability getNextBuffer() throws IOException, 
InterruptedException {
                BufferAndBacklog next = subpartitionView.getNextBuffer();
                if (next != null) {
-                       long remaining = numBuffersAvailable.decrementAndGet();
+                       buffersAvailable.set(next.isMoreAvailable());
                        sequenceNumber++;
-
-                       if (remaining >= 0) {
-                               return new BufferAndAvailability(next.buffer(), 
remaining > 0, next.buffersInBacklog());
-                       } else {
-                               throw new IllegalStateException("no buffer 
available");
-                       }
+                       return new BufferAndAvailability(next.buffer(), 
next.isMoreAvailable(), next.buffersInBacklog());
                } else {
                        return null;
                }
@@ -144,11 +142,9 @@ class SequenceNumberingViewReader implements 
BufferAvailabilityListener, Network
        }
 
        @Override
-       public void notifyBuffersAvailable(long numBuffers) {
-               // if this request made the channel non-empty, notify the input 
gate
-               if (numBuffers > 0 && numBuffersAvailable.getAndAdd(numBuffers) 
== 0) {
-                       requestQueue.notifyReaderNonEmpty(this);
-               }
+       public void notifyDataAvailable() {
+               buffersAvailable.set(true);
+               requestQueue.notifyReaderNonEmpty(this);
        }
 
        @Override
@@ -156,7 +152,6 @@ class SequenceNumberingViewReader implements 
BufferAvailabilityListener, Network
                return "SequenceNumberingViewReader{" +
                        "requestLock=" + requestLock +
                        ", receiverId=" + receiverId +
-                       ", numBuffersAvailable=" + numBuffersAvailable.get() +
                        ", sequenceNumber=" + sequenceNumber +
                        '}';
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java
index 114ef7c..e78f99a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java
@@ -25,9 +25,7 @@ package org.apache.flink.runtime.io.network.partition;
 public interface BufferAvailabilityListener {
 
        /**
-        * Called whenever a new number of buffers becomes available.
-        *
-        * @param numBuffers The number of buffers that became available.
+        * Called whenever there might be new data available.
         */
-       void notifyBuffersAvailable(long numBuffers);
+       void notifyDataAvailable();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 2fa512a..dcaa360 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -63,6 +63,15 @@ class PipelinedSubpartition extends ResultSubpartition {
        }
 
        @Override
+       public void flush() {
+               synchronized (buffers) {
+                       if (readView != null) {
+                               readView.notifyDataAvailable();
+                       }
+               }
+       }
+
+       @Override
        public void finish() throws IOException {
                
add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true);
                LOG.debug("Finished {}.", this);
@@ -84,10 +93,10 @@ class PipelinedSubpartition extends ResultSubpartition {
 
                        if (finish) {
                                isFinished = true;
+                               notifyDataAvailable();
                        }
-
-                       if (readView != null) {
-                               readView.notifyBuffersAvailable(1);
+                       else {
+                               maybeNotifyDataAvailable();
                        }
                }
 
@@ -127,19 +136,42 @@ class PipelinedSubpartition extends ResultSubpartition {
        @Nullable
        BufferAndBacklog pollBuffer() {
                synchronized (buffers) {
-                       BufferConsumer bufferConsumer = buffers.peek();
-                       if (bufferConsumer == null) {
-                               return null;
+                       Buffer buffer = null;
+
+                       while (!buffers.isEmpty()) {
+                               BufferConsumer bufferConsumer = buffers.peek();
+
+                               buffer = bufferConsumer.build();
+                               checkState(bufferConsumer.isFinished() || 
buffers.size() == 1,
+                                       "When there are multiple buffers, an 
unfinished bufferConsumer can not be at the head of the buffers queue.");
+
+                               if (bufferConsumer.isFinished()) {
+                                       buffers.pop().close();
+                                       
decreaseBuffersInBacklogUnsafe(bufferConsumer.isBuffer());
+                               }
+                               if (buffer.readableBytes() > 0) {
+                                       break;
+                               }
+                               buffer.recycleBuffer();
+                               buffer = null;
+                               if (!bufferConsumer.isFinished()) {
+                                       break;
+                               }
                        }
 
-                       Buffer buffer = bufferConsumer.build();
-                       if (bufferConsumer.isFinished()) {
-                               buffers.pop().close();
-                               
decreaseBuffersInBacklogUnsafe(bufferConsumer.isBuffer());
+                       if (buffer == null) {
+                               return null;
                        }
 
                        updateStatistics(buffer);
-                       return new BufferAndBacklog(buffer, 
getBuffersInBacklog(), _nextBufferIsEvent());
+                       // Do not report last remaining buffer on buffers as 
available to read (assuming it's unfinished).
+                       // It will be reported for reading either on flush or 
when the number of buffers in the queue
+                       // will be 2 or more.
+                       return new BufferAndBacklog(
+                               buffer,
+                               getNumberOfFinishedBuffers() > 0,
+                               getBuffersInBacklog(),
+                               _nextBufferIsEvent());
                }
        }
 
@@ -169,8 +201,6 @@ class PipelinedSubpartition extends ResultSubpartition {
 
        @Override
        public PipelinedSubpartitionView 
createReadView(BufferAvailabilityListener availabilityListener) throws 
IOException {
-               final int queueSize;
-
                synchronized (buffers) {
                        checkState(!isReleased);
                        checkState(readView == null,
@@ -179,12 +209,12 @@ class PipelinedSubpartition extends ResultSubpartition {
 
                        LOG.debug("Creating read view for subpartition {} of 
partition {}.", index, parent.getPartitionId());
 
-                       queueSize = buffers.size();
                        readView = new PipelinedSubpartitionView(this, 
availabilityListener);
+                       if (!buffers.isEmpty()) {
+                               readView.notifyDataAvailable();
+                       }
                }
 
-               readView.notifyBuffersAvailable(queueSize);
-
                return readView;
        }
 
@@ -220,4 +250,26 @@ class PipelinedSubpartition extends ResultSubpartition {
                // since we do not synchronize, the size may actually be lower 
than 0!
                return Math.max(buffers.size(), 0);
        }
+
+       private void maybeNotifyDataAvailable() {
+               // Notify only when we added first finished buffer.
+               if (getNumberOfFinishedBuffers() == 1) {
+                       notifyDataAvailable();
+               }
+       }
+
+       private void notifyDataAvailable() {
+               if (readView != null) {
+                       readView.notifyDataAvailable();
+               }
+       }
+
+       private int getNumberOfFinishedBuffers() {
+               if (buffers.size() == 1 && buffers.peekLast().isFinished()) {
+                       return 1;
+               }
+
+               // We assume that only last buffer is not finished.
+               return Math.max(0, buffers.size() - 1);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
index 21abd04..c60a604 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
@@ -52,8 +52,8 @@ class PipelinedSubpartitionView implements 
ResultSubpartitionView {
        }
 
        @Override
-       public void notifyBuffersAvailable(long numBuffers) {
-               availabilityListener.notifyBuffersAvailable(numBuffers);
+       public void notifyDataAvailable() {
+               availabilityListener.notifyDataAvailable();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 9be261e..25a076b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -257,6 +257,13 @@ public class ResultPartition implements 
ResultPartitionWriter, BufferPoolOwner {
                }
        }
 
+       @Override
+       public void flush() {
+               for (ResultSubpartition subpartition : subpartitions) {
+                       subpartition.flush();
+               }
+       }
+
        /**
         * Finishes the result partition.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index 572cde7..adc0ed3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -93,6 +93,9 @@ public abstract class ResultSubpartition {
         * <p>The request may be executed synchronously, or asynchronously, 
depending on the
         * implementation.
         *
+        * <p><strong>IMPORTANT:</strong> Before adding new {@link 
BufferConsumer} previously added must be in finished
+        * state. Because of the performance reasons, this is only enforced 
during the data reading.
+        *
         * @param bufferConsumer
         *              the buffer to add (transferring ownership to this 
writer)
         * @return true if operation succeeded and bufferConsumer was enqueued 
for consumption.
@@ -101,6 +104,8 @@ public abstract class ResultSubpartition {
         */
        abstract public boolean add(BufferConsumer bufferConsumer) throws 
IOException;
 
+       abstract public void flush();
+
        abstract public void finish() throws IOException;
 
        abstract public void release() throws IOException;
@@ -170,12 +175,14 @@ public abstract class ResultSubpartition {
        public static final class BufferAndBacklog {
 
                private final Buffer buffer;
+               private final boolean isMoreAvailable;
                private final int buffersInBacklog;
                private final boolean nextBufferIsEvent;
 
-               public BufferAndBacklog(Buffer buffer, int buffersInBacklog, 
boolean nextBufferIsEvent) {
+               public BufferAndBacklog(Buffer buffer, boolean isMoreAvailable, 
int buffersInBacklog, boolean nextBufferIsEvent) {
                        this.buffer = checkNotNull(buffer);
                        this.buffersInBacklog = buffersInBacklog;
+                       this.isMoreAvailable = isMoreAvailable;
                        this.nextBufferIsEvent = nextBufferIsEvent;
                }
 
@@ -183,10 +190,15 @@ public abstract class ResultSubpartition {
                        return buffer;
                }
 
+               public boolean isMoreAvailable() {
+                       return isMoreAvailable;
+               }
+
                public int buffersInBacklog() {
                        return buffersInBacklog;
                }
 
+
                public boolean nextBufferIsEvent() {
                        return nextBufferIsEvent;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
index 9b0344e..41fbb0a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
@@ -43,7 +43,7 @@ public interface ResultSubpartitionView {
        @Nullable
        BufferAndBacklog getNextBuffer() throws IOException, 
InterruptedException;
 
-       void notifyBuffersAvailable(long buffers);
+       void notifyDataAvailable();
 
        void releaseAllResources() throws IOException;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 4b9f59f..8758b34 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -115,11 +115,22 @@ class SpillableSubpartition extends ResultSubpartition {
        }
 
        @Override
+       public void flush() {
+               synchronized (buffers) {
+                       if (readView != null) {
+                               readView.notifyDataAvailable();
+                       }
+               }
+       }
+
+       @Override
        public synchronized void finish() throws IOException {
                synchronized (buffers) {
                        if 
(add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE))) {
                                isFinished = true;
                        }
+
+                       flush();
                }
 
                // If we are spilling/have spilled, wait for the writer to 
finish

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index 6c173a3..789b3d0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -89,7 +89,7 @@ class SpillableSubpartitionView implements 
ResultSubpartitionView {
                }
 
                if (nextBuffer != null) {
-                       listener.notifyBuffersAvailable(1);
+                       listener.notifyDataAvailable();
                }
        }
 
@@ -143,20 +143,24 @@ class SpillableSubpartitionView implements 
ResultSubpartitionView {
                Buffer current = null;
                boolean nextBufferIsEvent = false;
                int newBacklog = 0; // this is always correct if current is 
non-null!
+               boolean isMoreAvailable = false;
 
                synchronized (buffers) {
                        if (isReleased.get()) {
                                return null;
                        } else if (nextBuffer != null) {
                                current = nextBuffer.build();
+
                                if (nextBuffer.isFinished()) {
                                        newBacklog = 
parent.decreaseBuffersInBacklogUnsafe(nextBuffer.isBuffer());
                                        nextBuffer.close();
                                        nextBuffer = buffers.poll();
                                }
 
+                               isMoreAvailable = buffers.size() > 0;
                                if (nextBuffer != null) {
-                                       listener.notifyBuffersAvailable(1);
+                                       isMoreAvailable = true;
+                                       listener.notifyDataAvailable();
                                        nextBufferIsEvent = 
!nextBuffer.isBuffer();
                                }
 
@@ -164,7 +168,7 @@ class SpillableSubpartitionView implements 
ResultSubpartitionView {
                                // if we are spilled (but still process a 
non-spilled nextBuffer), we don't know the
                                // state of nextBufferIsEvent...
                                if (spilledView == null) {
-                                       return new BufferAndBacklog(current, 
newBacklog, nextBufferIsEvent);
+                                       return new BufferAndBacklog(current, 
isMoreAvailable, newBacklog, nextBufferIsEvent);
                                }
                        }
                } // else: spilled
@@ -172,7 +176,7 @@ class SpillableSubpartitionView implements 
ResultSubpartitionView {
                SpilledSubpartitionView spilled = spilledView;
                if (spilled != null) {
                        if (current != null) {
-                               return new BufferAndBacklog(current, 
newBacklog, spilled.nextBufferIsEvent());
+                               return new BufferAndBacklog(current, 
isMoreAvailable, newBacklog, spilled.nextBufferIsEvent());
                        } else {
                                return spilled.getNextBuffer();
                        }
@@ -182,7 +186,7 @@ class SpillableSubpartitionView implements 
ResultSubpartitionView {
        }
 
        @Override
-       public void notifyBuffersAvailable(long buffers) {
+       public void notifyDataAvailable() {
                // We do the availability listener notification one by one
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
index d1917e6..4c5cd2e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
@@ -105,7 +105,7 @@ class SpilledSubpartitionView implements 
ResultSubpartitionView, NotificationLis
                // Otherwise, we notify only when the spill writer callback 
happens.
                if (!spillWriter.registerAllRequestsProcessedListener(this)) {
                        isSpillInProgress = false;
-                       
availabilityListener.notifyBuffersAvailable(numberOfSpilledBuffers);
+                       availabilityListener.notifyDataAvailable();
                        LOG.debug("No spilling in progress. Notified about {} 
available buffers.", numberOfSpilledBuffers);
                } else {
                        LOG.debug("Spilling in progress. Waiting with 
notification about {} available buffers.", numberOfSpilledBuffers);
@@ -120,7 +120,7 @@ class SpilledSubpartitionView implements 
ResultSubpartitionView, NotificationLis
        @Override
        public void onNotification() {
                isSpillInProgress = false;
-               
availabilityListener.notifyBuffersAvailable(numberOfSpilledBuffers);
+               availabilityListener.notifyDataAvailable();
                LOG.debug("Finished spilling. Notified about {} available 
buffers.", numberOfSpilledBuffers);
        }
 
@@ -148,7 +148,7 @@ class SpilledSubpartitionView implements 
ResultSubpartitionView, NotificationLis
                }
 
                int newBacklog = parent.decreaseBuffersInBacklog(current);
-               return new BufferAndBacklog(current, newBacklog, 
nextBufferIsEvent);
+               return new BufferAndBacklog(current, newBacklog > 0, 
newBacklog, nextBufferIsEvent);
        }
 
        @Nullable
@@ -166,7 +166,7 @@ class SpilledSubpartitionView implements 
ResultSubpartitionView, NotificationLis
        }
 
        @Override
-       public void notifyBuffersAvailable(long buffers) {
+       public void notifyDataAvailable() {
                // We do the availability listener notification either directly 
on
                // construction of this view (when everything has been spilled) 
or
                // as soon as spilling is done and we are notified about it in 
the

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 7b7edf7..3ce5866 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 
 import java.io.IOException;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -134,9 +135,9 @@ public abstract class InputChannel {
        abstract void requestSubpartition(int subpartitionIndex) throws 
IOException, InterruptedException;
 
        /**
-        * Returns the next buffer from the consumed subpartition.
+        * Returns the next buffer from the consumed subpartition or {@code 
Optional.empty()} if there is no data to return.
         */
-       abstract BufferAndAvailability getNextBuffer() throws IOException, 
InterruptedException;
+       abstract Optional<BufferAndAvailability> getNextBuffer() throws 
IOException, InterruptedException;
 
        // 
------------------------------------------------------------------------
        // Task events

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 8505666..f9c75ad 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -23,19 +23,19 @@ import 
org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
-import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Optional;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -57,9 +57,6 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
        /** Task event dispatcher for backwards events. */
        private final TaskEventDispatcher taskEventDispatcher;
 
-       /** Number of available buffers used to keep track of non-empty gate 
notifications. */
-       private final AtomicLong numBuffersAvailable;
-
        /** The consumed subpartition */
        private volatile ResultSubpartitionView subpartitionView;
 
@@ -91,7 +88,6 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
 
                this.partitionManager = checkNotNull(partitionManager);
                this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
-               this.numBuffersAvailable = new AtomicLong();
        }
 
        // 
------------------------------------------------------------------------
@@ -166,11 +162,19 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
        }
 
        @Override
-       BufferAndAvailability getNextBuffer() throws IOException, 
InterruptedException {
+       Optional<BufferAndAvailability> getNextBuffer() throws IOException, 
InterruptedException {
                checkError();
 
                ResultSubpartitionView subpartitionView = this.subpartitionView;
                if (subpartitionView == null) {
+                       // There is a possible race condition between writing a 
EndOfPartitionEvent (1) and flushing (3) the Local
+                       // channel on the sender side, and reading 
EndOfPartitionEvent (2) and processing flush notification (4). When
+                       // they happen in that order (1 - 2 - 3 - 4), flush 
notification can re-enqueue LocalInputChannel after (or
+                       // during) it was released during reading the 
EndOfPartitionEvent (2).
+                       if (isReleased) {
+                               return Optional.empty();
+                       }
+
                        // this can happen if the request for the partition was 
triggered asynchronously
                        // by the time trigger
                        // would be good to avoid that, by guaranteeing that 
the requestPartition() and
@@ -185,31 +189,17 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
                        if (subpartitionView.isReleased()) {
                                throw new CancelTaskException("Consumed 
partition " + subpartitionView + " has been released.");
                        } else {
-                               // This means there is a bug in the buffer 
availability
-                               // notifications.
-                               throw new IllegalStateException("Consumed 
partition has no buffers available. " +
-                                       "Number of received buffer 
notifications is " + numBuffersAvailable + ".");
+                               return Optional.empty();
                        }
                }
 
-               long remaining = numBuffersAvailable.decrementAndGet();
-
-               if (remaining >= 0) {
-                       numBytesIn.inc(next.buffer().getSizeUnsafe());
-                       return new BufferAndAvailability(next.buffer(), 
remaining > 0, next.buffersInBacklog());
-               } else if (subpartitionView.isReleased()) {
-                       throw new 
ProducerFailedException(subpartitionView.getFailureCause());
-               } else {
-                       throw new IllegalStateException("No buffer available 
and producer partition not released.");
-               }
+               numBytesIn.inc(next.buffer().getSizeUnsafe());
+               return Optional.of(new BufferAndAvailability(next.buffer(), 
next.isMoreAvailable(), next.buffersInBacklog()));
        }
 
        @Override
-       public void notifyBuffersAvailable(long numBuffers) {
-               // if this request made the channel non-empty, notify the input 
gate
-               if (numBuffers > 0 && numBuffersAvailable.getAndAdd(numBuffers) 
== 0) {
-                       notifyChannelNonEmpty();
-               }
+       public void notifyDataAvailable() {
+               notifyChannelNonEmpty();
        }
 
        private ResultSubpartitionView checkAndWaitForSubpartitionView() {

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 8a8c7f5..8174359 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -42,6 +42,7 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -183,7 +184,7 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler,
        }
 
        @Override
-       BufferAndAvailability getNextBuffer() throws IOException {
+       Optional<BufferAndAvailability> getNextBuffer() throws IOException {
                checkState(!isReleased.get(), "Queried for a buffer after 
channel has been closed.");
                checkState(partitionRequestClient != null, "Queried for a 
buffer before requesting a queue.");
 
@@ -198,7 +199,7 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler,
                }
 
                numBytesIn.inc(next.getSizeUnsafe());
-               return new BufferAndAvailability(next, remaining > 0, 
getSenderBacklog());
+               return Optional.of(new BufferAndAvailability(next, remaining > 
0, getSenderBacklog()));
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 337b3c2..04b8ee6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -509,39 +509,39 @@ public class SingleInputGate implements InputGate {
 
                InputChannel currentChannel;
                boolean moreAvailable;
-               synchronized (inputChannelsWithData) {
-                       while (inputChannelsWithData.size() == 0) {
-                               if (isReleased) {
-                                       throw new 
IllegalStateException("Released");
-                               }
+               Optional<BufferAndAvailability> result = Optional.empty();
 
-                               if (blocking) {
-                                       inputChannelsWithData.wait();
-                               }
-                               else {
-                                       return Optional.empty();
+               do {
+                       synchronized (inputChannelsWithData) {
+                               while (inputChannelsWithData.size() == 0) {
+                                       if (isReleased) {
+                                               throw new 
IllegalStateException("Released");
+                                       }
+
+                                       if (blocking) {
+                                               inputChannelsWithData.wait();
+                                       }
+                                       else {
+                                               return Optional.empty();
+                                       }
                                }
+
+                               currentChannel = inputChannelsWithData.remove();
+                               
enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
+                               moreAvailable = inputChannelsWithData.size() > 
0;
                        }
 
-                       currentChannel = inputChannelsWithData.remove();
-                       
enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
-                       moreAvailable = inputChannelsWithData.size() > 0;
-               }
+                       result = currentChannel.getNextBuffer();
+               } while (!result.isPresent());
 
-               final BufferAndAvailability result = 
currentChannel.getNextBuffer();
-               // Sanity check that notifications only happen when data is 
available
-               if (result == null) {
-                       throw new IllegalStateException("Bug in input 
gate/channel logic: input gate got " +
-                                       "notified by channel about available 
data, but none was available.");
-               }
                // this channel was now removed from the non-empty channels 
queue
                // we re-add it in case it has more data, because in that case 
no "non-empty" notification
                // will come for that channel
-               if (result.moreAvailable()) {
+               if (result.get().moreAvailable()) {
                        queueChannel(currentChannel);
                }
 
-               final Buffer buffer = result.buffer();
+               final Buffer buffer = result.get().buffer();
                if (buffer.isBuffer()) {
                        return Optional.of(new BufferOrEvent(buffer, 
currentChannel.getChannelIndex(), moreAvailable));
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 14c04bc..5a547ea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -26,12 +26,14 @@ import 
org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Input gate wrapper to union the input from multiple input gates.
@@ -71,6 +73,11 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
        /** Gates, which notified this input gate about available data. */
        private final ArrayDeque<InputGate> inputGatesWithData = new 
ArrayDeque<>();
 
+       /**
+        * Guardian against enqueuing an {@link InputGate} multiple times on 
{@code inputGatesWithData}.
+        */
+       private final Set<InputGate> enqueuedInputGatesWithData = new 
HashSet<>();
+
        /** The total number of input channels across all unioned input gates. 
*/
        private final int totalNumberOfInputChannels;
 
@@ -163,12 +170,20 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
                        && bufferOrEvent.getEvent().getClass() == 
EndOfPartitionEvent.class
                        && inputGate.isFinished()) {
 
+                       checkState(!bufferOrEvent.moreAvailable());
                        if (!inputGatesWithRemainingData.remove(inputGate)) {
                                throw new IllegalStateException("Couldn't find 
input gate in set of remaining " +
                                        "input gates.");
                        }
                }
 
+               if (bufferOrEvent.moreAvailable()) {
+                       // this buffer or event was now removed from the 
non-empty gates queue
+                       // we re-add it in case it has more data, because in 
that case no "non-empty" notification
+                       // will come for that gate
+                       queueInputGate(inputGate);
+               }
+
                // Set the channel index to identify the input channel (across 
all unioned input gates)
                final int channelIndexOffset = 
inputGateToIndexOffsetMap.get(inputGate);
 
@@ -190,6 +205,7 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
                                        inputGatesWithData.wait();
                                }
                                inputGate = inputGatesWithData.remove();
+                               enqueuedInputGatesWithData.remove(inputGate);
                        }
 
                        // In case of inputGatesWithData being inaccurate do 
not block on an empty inputGate, but just poll the data.
@@ -248,9 +264,14 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
                int availableInputGates;
 
                synchronized (inputGatesWithData) {
+                       if (enqueuedInputGatesWithData.contains(inputGate)) {
+                               return;
+                       }
+
                        availableInputGates = inputGatesWithData.size();
 
                        inputGatesWithData.add(inputGate);
+                       enqueuedInputGatesWithData.add(inputGate);
 
                        if (availableInputGates == 0) {
                                inputGatesWithData.notifyAll();

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index d887ab6..1101f66 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 
 import java.io.IOException;
+import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -76,7 +77,7 @@ class UnknownInputChannel extends InputChannel {
        }
 
        @Override
-       public BufferAndAvailability getNextBuffer() throws IOException {
+       public Optional<BufferAndAvailability> getNextBuffer() throws 
IOException {
                // Nothing to do here
                throw new UnsupportedOperationException("Cannot retrieve a 
buffer from an UnknownInputChannel");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
index 3526e96..382ae39 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
@@ -18,16 +18,16 @@
 
 package org.apache.flink.runtime.operators.shipping;
 
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.util.Collector;
 
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
 /**
  * The OutputCollector collects records, and emits them to the  {@link 
RecordWriter}s.
  * The OutputCollector tracks to which writers a deep-copy must be given and 
which not.
@@ -81,11 +81,8 @@ public class OutputCollector<T> implements Collector<T> {
        @Override
        public void close() {
                for (RecordWriter<?> writer : writers) {
-                       try {
-                               writer.flush();
-                       } catch (IOException e) {
-                               throw new RuntimeException(e.getMessage(), e);
-                       }
+                       writer.clearBuffers();
+                       writer.flush();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
index 5a7d20a..b2171c6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
@@ -23,6 +23,8 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
+import javax.annotation.concurrent.ThreadSafe;
+
 import java.io.IOException;
 import java.util.ArrayDeque;
 
@@ -32,6 +34,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 /**
  * {@link ResultPartitionWriter} that collects output on the List.
  */
+@ThreadSafe
 public abstract class AbstractCollectingResultPartitionWriter implements 
ResultPartitionWriter {
        private final BufferProvider bufferProvider;
        private final ArrayDeque<BufferConsumer> bufferConsumers = new 
ArrayDeque<>();
@@ -61,13 +64,15 @@ public abstract class 
AbstractCollectingResultPartitionWriter implements ResultP
        }
 
        @Override
-       public void addBufferConsumer(BufferConsumer bufferConsumer, int 
targetChannel) throws IOException {
+       public synchronized void addBufferConsumer(BufferConsumer 
bufferConsumer, int targetChannel) throws IOException {
                checkState(targetChannel < getNumberOfSubpartitions());
-
                bufferConsumers.add(bufferConsumer);
+               processBufferConsumers();
+       }
 
+       private void processBufferConsumers() throws IOException {
                while (!bufferConsumers.isEmpty()) {
-                       bufferConsumer = bufferConsumers.peek();
+                       BufferConsumer bufferConsumer = bufferConsumers.peek();
                        Buffer buffer = bufferConsumer.build();
                        try {
                                deserializeBuffer(buffer);
@@ -82,5 +87,14 @@ public abstract class 
AbstractCollectingResultPartitionWriter implements ResultP
                }
        }
 
+       @Override
+       public synchronized void flush() {
+               try {
+                       processBufferConsumers();
+               } catch (IOException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
        protected abstract void deserializeBuffer(Buffer buffer) throws 
IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 95d6655..ed32454 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -62,10 +62,7 @@ import java.util.concurrent.Future;
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -180,7 +177,6 @@ public class RecordWriterTest {
 
                // Fill a buffer, but don't write it out.
                recordWriter.emit(new IntValue(0));
-               verify(partitionWriter, 
never()).addBufferConsumer(any(BufferConsumer.class), anyInt());
 
                // Clear all buffers.
                recordWriter.clearBuffers();
@@ -428,6 +424,10 @@ public class RecordWriterTest {
                public void addBufferConsumer(BufferConsumer buffer, int 
targetChannel) throws IOException {
                        queues[targetChannel].add(buffer);
                }
+
+               @Override
+               public void flush() {
+               }
        }
 
        private static BufferOrEvent parseBuffer(BufferConsumer bufferConsumer, 
int targetChannel) throws IOException {
@@ -477,6 +477,10 @@ public class RecordWriterTest {
                public void addBufferConsumer(BufferConsumer bufferConsumer, 
int targetChannel) throws IOException {
                        bufferConsumer.close();
                }
+
+               @Override
+               public void flush() {
+               }
        }
 
        private static class ByteArrayIO implements IOReadableWritable {

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
index c6b8599..ead42df 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
@@ -30,6 +30,10 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 public class BufferBuilderTestUtils {
        public static final int BUFFER_SIZE = 32 * 1024;
 
+       public static BufferBuilder createBufferBuilder() {
+               return createBufferBuilder(BUFFER_SIZE);
+       }
+
        public static BufferBuilder createBufferBuilder(int size) {
                return createFilledBufferBuilder(size, 0);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
index 4c4939b..56abff1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
@@ -45,7 +45,6 @@ import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.CancelPartitionRequest;
 import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.PartitionRequest;
-import static org.apache.flink.runtime.io.network.netty.NettyMessage.AddCredit;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.connect;
 import static 
org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
@@ -88,7 +87,7 @@ public class CancelPartitionRequestTest {
                                        @Override
                                        public ResultSubpartitionView 
answer(InvocationOnMock invocationOnMock) throws Throwable {
                                                BufferAvailabilityListener 
listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[2];
-                                               
listener.notifyBuffersAvailable(Long.MAX_VALUE);
+                                               listener.notifyDataAvailable();
                                                return view;
                                        }
                                });
@@ -139,7 +138,7 @@ public class CancelPartitionRequestTest {
                                                @Override
                                                public ResultSubpartitionView 
answer(InvocationOnMock invocationOnMock) throws Throwable {
                                                        
BufferAvailabilityListener listener = (BufferAvailabilityListener) 
invocationOnMock.getArguments()[2];
-                                                       
listener.notifyBuffersAvailable(Long.MAX_VALUE);
+                                                       
listener.notifyDataAvailable();
                                                        return view;
                                                }
                                        });
@@ -194,11 +193,11 @@ public class CancelPartitionRequestTest {
                public BufferAndBacklog getNextBuffer() throws IOException, 
InterruptedException {
                        Buffer buffer = bufferProvider.requestBufferBlocking();
                        buffer.setSize(buffer.getMaxCapacity()); // fake some 
data
-                       return new BufferAndBacklog(buffer, 0, false);
+                       return new BufferAndBacklog(buffer, true, 0, false);
                }
 
                @Override
-               public void notifyBuffersAvailable(long buffers) {
+               public void notifyDataAvailable() {
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
index 69a0e11..16418ff 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
@@ -36,6 +36,7 @@ import org.junit.Test;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.instanceOf;
@@ -52,6 +53,43 @@ import static org.junit.Assert.assertTrue;
  */
 public class PartitionRequestQueueTest {
 
+       /**
+        * In case of enqueuing an empty reader and a reader that actually has 
some buffers when channel is not writable,
+        * on channelWritability change event should result in reading all of 
the messages.
+        */
+       @Test
+       public void testNotifyReaderNonEmptyOnEmptyReaders() throws Exception {
+               final int buffersToWrite = 5;
+               PartitionRequestQueue queue = new PartitionRequestQueue();
+               EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+               CreditBasedSequenceNumberingViewReader reader1 = new 
CreditBasedSequenceNumberingViewReader(new InputChannelID(0, 0), 10, queue);
+               CreditBasedSequenceNumberingViewReader reader2 = new 
CreditBasedSequenceNumberingViewReader(new InputChannelID(1, 1), 10, queue);
+
+               reader1.requestSubpartitionView((partitionId, index, 
availabilityListener) -> new NotReleasedResultSubpartitionView(), new 
ResultPartitionID(), 0);
+               reader1.notifyDataAvailable();
+               assertTrue(reader1.isAvailable());
+               assertFalse(reader1.isRegisteredAsAvailable());
+
+               channel.unsafe().outboundBuffer().setUserDefinedWritability(1, 
false);
+               assertFalse(channel.isWritable());
+
+               reader1.notifyDataAvailable();
+               channel.runPendingTasks();
+
+               reader2.notifyDataAvailable();
+               reader2.requestSubpartitionView((partitionId, index, 
availabilityListener) -> new 
DefaultBufferResultSubpartitionView(buffersToWrite), new ResultPartitionID(), 
0);
+               assertTrue(reader2.isAvailable());
+               assertFalse(reader2.isRegisteredAsAvailable());
+
+               reader2.notifyDataAvailable();
+
+               // changing a channel writability should result in draining 
both reader1 and reader2
+               channel.unsafe().outboundBuffer().setUserDefinedWritability(1, 
true);
+               channel.runPendingTasks();
+               assertEquals(buffersToWrite, channel.outboundMessages().size());
+       }
+
        @Test
        public void testProducerFailedException() throws Exception {
                PartitionRequestQueue queue = new PartitionRequestQueue();
@@ -66,7 +104,7 @@ public class PartitionRequestQueueTest {
                CreditBasedSequenceNumberingViewReader seqView = new 
CreditBasedSequenceNumberingViewReader(new InputChannelID(), 2, queue);
                seqView.requestSubpartitionView(partitionProvider, new 
ResultPartitionID(), 0);
                // Add available buffer to trigger enqueue the erroneous view
-               seqView.notifyBuffersAvailable(1);
+               seqView.notifyDataAvailable();
 
                ch.runPendingTasks();
 
@@ -84,7 +122,7 @@ public class PartitionRequestQueueTest {
         */
        @Test
        public void testDefaultBufferWriting() throws Exception {
-               testBufferWriting(new DefaultBufferResultSubpartitionView(2));
+               testBufferWriting(new DefaultBufferResultSubpartitionView(1));
        }
 
        /**
@@ -92,7 +130,7 @@ public class PartitionRequestQueueTest {
         */
        @Test
        public void testReadOnlyBufferWriting() throws Exception {
-               testBufferWriting(new ReadOnlyBufferResultSubpartitionView(2));
+               testBufferWriting(new ReadOnlyBufferResultSubpartitionView(1));
        }
 
        private void testBufferWriting(ResultSubpartitionView view) throws 
IOException {
@@ -108,7 +146,7 @@ public class PartitionRequestQueueTest {
                reader.requestSubpartitionView(partitionProvider, new 
ResultPartitionID(), 0);
 
                // notify about buffer availability and encode one buffer
-               reader.notifyBuffersAvailable(1);
+               reader.notifyDataAvailable();
 
                channel.runPendingTasks();
 
@@ -124,37 +162,45 @@ public class PartitionRequestQueueTest {
 
        private static class DefaultBufferResultSubpartitionView extends 
NoOpResultSubpartitionView {
                /** Number of buffer in the backlog to report with every {@link 
#getNextBuffer()} call. */
-               final int buffersInBacklog;
+               private final AtomicInteger buffersInBacklog;
 
                private DefaultBufferResultSubpartitionView(int 
buffersInBacklog) {
-                       this.buffersInBacklog = buffersInBacklog;
+                       this.buffersInBacklog = new 
AtomicInteger(buffersInBacklog);;
                }
 
                @Nullable
                @Override
                public BufferAndBacklog getNextBuffer() {
+                       int buffers = buffersInBacklog.decrementAndGet();
                        return new BufferAndBacklog(
                                TestBufferFactory.createBuffer(10),
-                               buffersInBacklog,
+                               buffers > 0,
+                               buffers,
                                false);
                }
        }
 
-       private static class ReadOnlyBufferResultSubpartitionView extends 
NoOpResultSubpartitionView {
-               /** Number of buffer in the backlog to report with every {@link 
#getNextBuffer()} call. */
-               final int buffersInBacklog;
-
+       private static class ReadOnlyBufferResultSubpartitionView extends 
DefaultBufferResultSubpartitionView {
                private ReadOnlyBufferResultSubpartitionView(int 
buffersInBacklog) {
-                       this.buffersInBacklog = buffersInBacklog;
+                       super(buffersInBacklog);
                }
 
                @Nullable
                @Override
                public BufferAndBacklog getNextBuffer() {
+                       BufferAndBacklog nextBuffer = super.getNextBuffer();
                        return new BufferAndBacklog(
-                               
TestBufferFactory.createBuffer(10).readOnlySlice(),
-                               buffersInBacklog,
-                               false);
+                               nextBuffer.buffer().readOnlySlice(),
+                               nextBuffer.isMoreAvailable(),
+                               nextBuffer.buffersInBacklog(),
+                               nextBuffer.nextBufferIsEvent());
+               }
+       }
+
+       private static class NotReleasedResultSubpartitionView extends 
NoOpResultSubpartitionView {
+               @Override
+               public boolean isReleased() {
+                       return false;
                }
        }
 
@@ -195,7 +241,7 @@ public class PartitionRequestQueueTest {
                assertNull(channel.readOutbound());
 
                // Notify an available event buffer to trigger enqueue the 
reader
-               reader.notifyBuffersAvailable(1);
+               reader.notifyDataAvailable();
 
                channel.runPendingTasks();
 
@@ -226,7 +272,7 @@ public class PartitionRequestQueueTest {
        @Test
        public void testEnqueueReaderByNotifyingBufferAndCredit() throws 
Exception {
                // setup
-               final ResultSubpartitionView view = new 
DefaultBufferResultSubpartitionView(2);
+               final ResultSubpartitionView view = new 
DefaultBufferResultSubpartitionView(10);
 
                ResultPartitionProvider partitionProvider =
                        (partitionId, index, availabilityListener) -> view;
@@ -246,7 +292,7 @@ public class PartitionRequestQueueTest {
                // Notify available buffers to trigger enqueue the reader
                final int notifyNumBuffers = 5;
                for (int i = 0; i < notifyNumBuffers; i++) {
-                       reader.notifyBuffersAvailable(1);
+                       reader.notifyDataAvailable();
                }
 
                channel.runPendingTasks();
@@ -254,7 +300,7 @@ public class PartitionRequestQueueTest {
                // the reader is not enqueued in the pipeline because no 
credits are available
                // -> it should still have the same number of pending buffers
                assertEquals(0, queue.getAvailableReaders().size());
-               assertEquals(notifyNumBuffers, reader.getNumBuffersAvailable());
+               assertTrue(reader.hasBuffersAvailable());
                assertFalse(reader.isRegisteredAsAvailable());
                assertEquals(0, reader.getNumCreditsAvailable());
 
@@ -269,7 +315,7 @@ public class PartitionRequestQueueTest {
                        assertTrue(reader.isRegisteredAsAvailable());
                        assertThat(queue.getAvailableReaders(), 
contains(reader)); // contains only (this) one!
                        assertEquals(i, reader.getNumCreditsAvailable());
-                       assertEquals(notifyNumBuffers, 
reader.getNumBuffersAvailable());
+                       assertTrue(reader.hasBuffersAvailable());
                }
 
                // Flush the buffer to make the channel writable again and see 
the final results
@@ -278,7 +324,7 @@ public class PartitionRequestQueueTest {
 
                assertEquals(0, queue.getAvailableReaders().size());
                assertEquals(0, reader.getNumCreditsAvailable());
-               assertEquals(notifyNumBuffers - notifyNumCredits, 
reader.getNumBuffersAvailable());
+               assertTrue(reader.hasBuffersAvailable());
                assertFalse(reader.isRegisteredAsAvailable());
                for (int i = 1; i <= notifyNumCredits; i++) {
                        assertThat(channel.readOutbound(), 
instanceOf(NettyMessage.BufferResponse.class));
@@ -316,7 +362,7 @@ public class PartitionRequestQueueTest {
                }
 
                @Override
-               public void notifyBuffersAvailable(long buffers) {
+               public void notifyDataAvailable() {
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
index 8646168..5360041 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
@@ -68,7 +68,7 @@ public class ServerTransportErrorHandlingTest {
                                @Override
                                public ResultSubpartitionView 
answer(InvocationOnMock invocationOnMock) throws Throwable {
                                        BufferAvailabilityListener listener = 
(BufferAvailabilityListener) invocationOnMock.getArguments()[2];
-                                       
listener.notifyBuffersAvailable(Long.MAX_VALUE);
+                                       listener.notifyDataAvailable();
                                        return new 
CancelPartitionRequestTest.InfiniteSubpartitionView(outboundBuffers, sync);
                                }
                        });

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java
new file mode 100644
index 0000000..2b6b834
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+/**
+ * Test implementation of {@link BufferAvailabilityListener}.
+ */
+class AwaitableBufferAvailablityListener implements BufferAvailabilityListener 
{
+
+       private long numNotifications;
+
+       @Override
+       public void notifyDataAvailable() {
+               ++numNotifications;
+       }
+
+       public long getNumNotifications() {
+               return numNotifications;
+       }
+
+       public void resetNotificationCounters() {
+               numNotifications = 0;
+       }
+
+       void awaitNotifications(long awaitedNumNotifications, long 
timeoutMillis) throws InterruptedException {
+               long deadline = System.currentTimeMillis() + timeoutMillis;
+               while (numNotifications < awaitedNumNotifications && 
System.currentTimeMillis() < deadline) {
+                       Thread.sleep(1);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpBufferAvailablityListener.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpBufferAvailablityListener.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpBufferAvailablityListener.java
new file mode 100644
index 0000000..4162975
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpBufferAvailablityListener.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+/**
+ * Test implementation of {@link BufferAvailabilityListener}.
+ */
+class NoOpBufferAvailablityListener implements BufferAvailabilityListener {
+       @Override
+       public void notifyDataAvailable() {
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5b1e127f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 76e6f2c..ced1a33 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -123,8 +123,8 @@ public class PartialConsumePipelinedResultTest extends 
TestLogger {
                        for (int i = 0; i < 8; i++) {
                                final BufferBuilder bufferBuilder = 
writer.getBufferProvider().requestBufferBuilderBlocking();
                                
writer.addBufferConsumer(bufferBuilder.createBufferConsumer(), 0);
-
                                Thread.sleep(50);
+                               bufferBuilder.finish();
                        }
                }
        }

Reply via email to