[FLINK-5169] [network] Make consumption of InputChannels fair

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6cfce175
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6cfce175
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6cfce175

Branch: refs/heads/release-1.1
Commit: 6cfce17588051281226de9fdff65dcd2476c1460
Parents: e2c53cf
Author: Stephan Ewen <[email protected]>
Authored: Mon Nov 28 09:59:29 2016 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Mon Nov 28 21:05:00 2016 +0100

----------------------------------------------------------------------
 .../io/network/api/reader/BufferReader.java     |  56 ---
 .../io/network/netty/PartitionRequestQueue.java | 255 +++++-------
 .../netty/PartitionRequestServerHandler.java    |  40 +-
 .../netty/SequenceNumberingViewReader.java      | 130 +++++++
 .../partition/BufferAvailabilityListener.java   |  33 ++
 .../partition/PipelinedSubpartition.java        | 148 ++++---
 .../partition/PipelinedSubpartitionView.java    |  18 +-
 .../io/network/partition/ResultPartition.java   |   9 +-
 .../partition/ResultPartitionManager.java       |   5 +-
 .../partition/ResultPartitionProvider.java      |   3 +-
 .../network/partition/ResultSubpartition.java   |   6 +-
 .../partition/ResultSubpartitionView.java       |   9 +-
 .../partition/SpillableSubpartition.java        | 177 +++++----
 .../partition/SpillableSubpartitionView.java    | 210 +++++-----
 .../partition/SpilledSubpartitionView.java      | 223 +++++++++++
 .../SpilledSubpartitionViewAsyncIO.java         | 383 -------------------
 .../SpilledSubpartitionViewSyncIO.java          | 196 ----------
 .../partition/consumer/BufferOrEvent.java       |  25 +-
 .../partition/consumer/InputChannel.java        |  43 ++-
 .../network/partition/consumer/InputGate.java   |   3 +-
 .../partition/consumer/InputGateListener.java   |  35 ++
 .../partition/consumer/LocalInputChannel.java   | 210 +++++-----
 .../partition/consumer/RemoteInputChannel.java  |  80 ++--
 .../partition/consumer/SingleInputGate.java     | 104 +++--
 .../partition/consumer/UnionInputGate.java      |  98 ++---
 .../partition/consumer/UnknownInputChannel.java |   9 +-
 .../apache/flink/runtime/taskmanager/Task.java  |   1 -
 27 files changed, 1147 insertions(+), 1362 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
deleted file mode 100644
index debb352..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api.reader;
-
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-
-import java.io.IOException;
-
-/**
- * A buffer-oriented reader.
- */
-public final class BufferReader extends AbstractReader {
-
-       public BufferReader(InputGate gate) {
-               super(gate);
-       }
-
-       public Buffer getNextBuffer() throws IOException, InterruptedException {
-               while (true) {
-                       final BufferOrEvent bufferOrEvent = 
inputGate.getNextBufferOrEvent();
-
-                       if (bufferOrEvent.isBuffer()) {
-                               return bufferOrEvent.getBuffer();
-                       }
-                       else {
-                               if (handleEvent(bufferOrEvent.getEvent())) {
-                                       return null;
-                               }
-                       }
-               }
-       }
-
-       @Override
-       public void setReporter(AccumulatorRegistry.Reporter reporter) {
-
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index 094c9c7..dc80675 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -27,10 +27,10 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse;
 import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
-import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
-import org.apache.flink.runtime.util.event.NotificationListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,11 +39,10 @@ import java.util.ArrayDeque;
 import java.util.Queue;
 import java.util.Set;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
 import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
 
 /**
- * A queue of partition queues, which listens for channel writability changed
+ * A nonEmptyReader of partition queues, which listens for channel writability 
changed
  * events before writing and flushing {@link Buffer} instances.
  */
 class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
@@ -52,12 +51,10 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
 
        private final ChannelFutureListener writeListener = new 
WriteAndFlushNextMessageIfPossibleListener();
 
-       private final Queue<SequenceNumberingSubpartitionView> queue = new 
ArrayDeque<SequenceNumberingSubpartitionView>();
+       private final Queue<SequenceNumberingViewReader> nonEmptyReader = new 
ArrayDeque<>();
 
        private final Set<InputChannelID> released = Sets.newHashSet();
 
-       private SequenceNumberingSubpartitionView currentPartitionQueue;
-
        private boolean fatalError;
 
        private ChannelHandlerContext ctx;
@@ -71,8 +68,22 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
                super.channelRegistered(ctx);
        }
 
-       public void enqueue(ResultSubpartitionView partitionQueue, 
InputChannelID receiverId) throws Exception {
-               ctx.pipeline().fireUserEventTriggered(new 
SequenceNumberingSubpartitionView(partitionQueue, receiverId));
+       void notifyReaderNonEmpty(final SequenceNumberingViewReader reader) {
+               // The notification might come from the same thread. For the 
initial writes this
+               // might happen before the reader has set its reference to the 
view, because
+               // creating the queue and the initial notification happen in 
the same method call.
+               // This can be resolved by separating the creation of the view 
and allowing
+               // notifications.
+
+               // TODO This could potentially have a bad performance impact as 
in the
+               // worst case (network consumes faster than the producer) each 
buffer
+               // will trigger a separate event loop task being scheduled.
+               ctx.executor().execute(new Runnable() {
+                       @Override
+                       public void run() {
+                               ctx.pipeline().fireUserEventTriggered(reader);
+                       }
+               });
        }
 
        public void cancel(InputChannelID receiverId) {
@@ -87,45 +98,37 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
 
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object msg) 
throws Exception {
-               if (msg.getClass() == SequenceNumberingSubpartitionView.class) {
-                       boolean triggerWrite = queue.isEmpty();
-
-                       queue.add((SequenceNumberingSubpartitionView) msg);
-
+               // The user event triggered event loop callback is used for 
thread-safe
+               // hand over of reader queues and cancelled producers.
+
+               if (msg.getClass() == SequenceNumberingViewReader.class) {
+                       // Queue a non-empty reader for consumption. If the 
queue
+                       // is empty, we try trigger the actual write. Otherwise 
this
+                       // will be handled by the writeAndFlushIfPossible calls.
+                       boolean triggerWrite = nonEmptyReader.isEmpty();
+                       nonEmptyReader.add((SequenceNumberingViewReader) msg);
                        if (triggerWrite) {
                                
writeAndFlushNextMessageIfPossible(ctx.channel());
                        }
-               }
-               else if (msg.getClass() == InputChannelID.class) {
+               } else if (msg.getClass() == InputChannelID.class) {
+                       // Release partition view that get a cancel request.
                        InputChannelID toCancel = (InputChannelID) msg;
-
                        if (released.contains(toCancel)) {
                                return;
                        }
 
                        // Cancel the request for the input channel
-                       if (currentPartitionQueue != null && 
currentPartitionQueue.getReceiverId().equals(toCancel)) {
-                               currentPartitionQueue.releaseAllResources();
-                               
markAsReleased(currentPartitionQueue.receiverId);
-                               currentPartitionQueue = null;
-                       }
-                       else {
-                               int size = queue.size();
-
-                               for (int i = 0; i < size; i++) {
-                                       SequenceNumberingSubpartitionView curr 
= queue.poll();
-
-                                       if 
(curr.getReceiverId().equals(toCancel)) {
-                                               curr.releaseAllResources();
-                                               markAsReleased(curr.receiverId);
-                                       }
-                                       else {
-                                               queue.add(curr);
-                                       }
+                       int size = nonEmptyReader.size();
+                       for (int i = 0; i < size; i++) {
+                               SequenceNumberingViewReader reader = 
nonEmptyReader.poll();
+                               if (reader.getReceiverId().equals(toCancel)) {
+                                       reader.releaseAllResources();
+                                       markAsReleased(reader.getReceiverId());
+                               } else {
+                                       nonEmptyReader.add(reader);
                                }
                        }
-               }
-               else {
+               } else {
                        ctx.fireUserEventTriggered(msg);
                }
        }
@@ -140,64 +143,84 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
                        return;
                }
 
-               Buffer buffer = null;
+               // The logic here is very similar to the combined input gate 
and local
+               // input channel logic. You can think of this class acting as 
the input
+               // gate and the consumed views as the local input channels.
 
+               BufferAndAvailability next = null;
                try {
                        if (channel.isWritable()) {
                                while (true) {
-                                       if (currentPartitionQueue == null && 
(currentPartitionQueue = queue.poll()) == null) {
+                                       SequenceNumberingViewReader reader = 
nonEmptyReader.poll();
+
+                                       // No queue with available data. We 
allow this here, because
+                                       // of the write callbacks that are 
executed after each write.
+                                       if (reader == null) {
                                                return;
                                        }
 
-                                       buffer = 
currentPartitionQueue.getNextBuffer();
+                                       next = reader.getNextBuffer();
 
-                                       if (buffer == null) {
-                                               if 
(currentPartitionQueue.registerListener(null)) {
-                                                       currentPartitionQueue = 
null;
-                                               }
-                                               else if 
(currentPartitionQueue.isReleased()) {
-                                                       
markAsReleased(currentPartitionQueue.getReceiverId());
-
-                                                       Throwable cause = 
currentPartitionQueue.getFailureCause();
+                                       if (next == null) {
+                                               if (reader.isReleased()) {
+                                                       
markAsReleased(reader.getReceiverId());
+                                                       Throwable cause = 
reader.getFailureCause();
 
                                                        if (cause != null) {
-                                                               
ctx.writeAndFlush(new NettyMessage.ErrorResponse(
-                                                                               
new ProducerFailedException(cause),
-                                                                               
currentPartitionQueue.receiverId));
-                                                       }
+                                                               ErrorResponse 
msg = new ErrorResponse(
+                                                                       new 
ProducerFailedException(cause),
+                                                                       
reader.getReceiverId());
 
-                                                       currentPartitionQueue = 
null;
+                                                               
ctx.writeAndFlush(msg);
+                                                       }
+                                               } else {
+                                                       IllegalStateException 
err = new IllegalStateException(
+                                                               "Bug in Netty 
consumer logic: reader queue got notified by partition " +
+                                                                       "about 
available data, but none was available.");
+                                                       
handleException(ctx.channel(), err);
+                                                       return;
+                                               }
+                                       } else {
+                                               // this channel was now removed 
from the non-empty reader queue
+                                               // we re-add it in case it has 
more data, because in that case no
+                                               // "non-empty" notification 
will come for that reader from the queue.
+                                               if (next.moreAvailable()) {
+                                                       
nonEmptyReader.add(reader);
                                                }
-                                       }
-                                       else {
-                                               BufferResponse resp = new 
BufferResponse(buffer, currentPartitionQueue.getSequenceNumber(), 
currentPartitionQueue.getReceiverId());
 
-                                               if (!buffer.isBuffer() &&
-                                                               
EventSerializer.fromBuffer(buffer, getClass().getClassLoader()).getClass() == 
EndOfPartitionEvent.class) {
+                                               BufferResponse msg = new 
BufferResponse(
+                                                       next.buffer(),
+                                                       
reader.getSequenceNumber(),
+                                                       reader.getReceiverId());
 
-                                                       
currentPartitionQueue.notifySubpartitionConsumed();
-                                                       
currentPartitionQueue.releaseAllResources();
-                                                       
markAsReleased(currentPartitionQueue.getReceiverId());
+                                               if 
(isEndOfPartitionEvent(next.buffer())) {
+                                                       
reader.notifySubpartitionConsumed();
+                                                       
reader.releaseAllResources();
 
-                                                       currentPartitionQueue = 
null;
+                                                       
markAsReleased(reader.getReceiverId());
                                                }
 
-                                               
channel.writeAndFlush(resp).addListener(writeListener);
+                                               // Write and flush and wait 
until this is done before
+                                               // trying to continue with the 
next buffer.
+                                               
channel.writeAndFlush(msg).addListener(writeListener);
 
                                                return;
                                        }
                                }
                        }
-               }
-               catch (Throwable t) {
-                       if (buffer != null) {
-                               buffer.recycle();
+               } catch (Throwable t) {
+                       if (next != null) {
+                               next.buffer().recycle();
                        }
 
                        throw new IOException(t.getMessage(), t);
                }
        }
 
+       private boolean isEndOfPartitionEvent(Buffer buffer) throws IOException 
{
+               return !buffer.isBuffer() && EventSerializer.fromBuffer(buffer, 
getClass().getClassLoader()).getClass() == EndOfPartitionEvent.class;
+       }
+
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception 
{
                releaseAllResources();
@@ -215,22 +238,15 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
                releaseAllResources();
 
                if (channel.isActive()) {
-                       channel.writeAndFlush(new 
NettyMessage.ErrorResponse(cause)).addListener(ChannelFutureListener.CLOSE);
+                       channel.writeAndFlush(new 
ErrorResponse(cause)).addListener(ChannelFutureListener.CLOSE);
                }
        }
 
        private void releaseAllResources() throws IOException {
-               if (currentPartitionQueue != null) {
-                       currentPartitionQueue.releaseAllResources();
-                       markAsReleased(currentPartitionQueue.getReceiverId());
-
-                       currentPartitionQueue = null;
-               }
-
-               while ((currentPartitionQueue = queue.poll()) != null) {
-                       currentPartitionQueue.releaseAllResources();
-
-                       markAsReleased(currentPartitionQueue.getReceiverId());
+               SequenceNumberingViewReader reader;
+               while ((reader = nonEmptyReader.poll()) != null) {
+                       reader.releaseAllResources();
+                       markAsReleased(reader.getReceiverId());
                }
        }
 
@@ -241,7 +257,7 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
                released.add(receiverId);
        }
 
-       // This listener is called after an element of the current queue has 
been
+       // This listener is called after an element of the current 
nonEmptyReader has been
        // flushed. If successful, the listener triggers further processing of 
the
        // queues.
        private class WriteAndFlushNextMessageIfPossibleListener implements 
ChannelFutureListener {
@@ -251,87 +267,14 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
                        try {
                                if (future.isSuccess()) {
                                        
writeAndFlushNextMessageIfPossible(future.channel());
-                               }
-                               else if (future.cause() != null) {
+                               } else if (future.cause() != null) {
                                        handleException(future.channel(), 
future.cause());
-                               }
-                               else {
+                               } else {
                                        handleException(future.channel(), new 
IllegalStateException("Sending cancelled by user."));
                                }
-                       }
-                       catch (Throwable t) {
+                       } catch (Throwable t) {
                                handleException(future.channel(), t);
                        }
                }
        }
-
-       /**
-        * Simple wrapper for the partition queue iterator, which increments a
-        * sequence number for each returned buffer and remembers the receiver 
ID.
-        */
-       private class SequenceNumberingSubpartitionView implements 
ResultSubpartitionView, NotificationListener {
-
-               private final ResultSubpartitionView queueIterator;
-
-               private final InputChannelID receiverId;
-
-               private int sequenceNumber = -1;
-
-               private 
SequenceNumberingSubpartitionView(ResultSubpartitionView queueIterator, 
InputChannelID receiverId) {
-                       this.queueIterator = checkNotNull(queueIterator);
-                       this.receiverId = checkNotNull(receiverId);
-               }
-
-               private InputChannelID getReceiverId() {
-                       return receiverId;
-               }
-
-               private int getSequenceNumber() {
-                       return sequenceNumber;
-               }
-
-               @Override
-               public Buffer getNextBuffer() throws IOException, 
InterruptedException {
-                       Buffer buffer = queueIterator.getNextBuffer();
-
-                       if (buffer != null) {
-                               sequenceNumber++;
-                       }
-
-                       return buffer;
-               }
-
-               @Override
-               public void notifySubpartitionConsumed() throws IOException {
-                       queueIterator.notifySubpartitionConsumed();
-               }
-
-               @Override
-               public boolean isReleased() {
-                       return queueIterator.isReleased();
-               }
-
-               @Override
-               public Throwable getFailureCause() {
-                       return queueIterator.getFailureCause();
-               }
-
-               @Override
-               public boolean registerListener(NotificationListener ignored) 
throws IOException {
-                       return queueIterator.registerListener(this);
-               }
-
-               @Override
-               public void releaseAllResources() throws IOException {
-                       queueIterator.releaseAllResources();
-               }
-
-               /**
-                * Enqueue this iterator again after a notification.
-                */
-               @Override
-               public void onNotification() {
-                       ctx.pipeline().fireUserEventTriggered(this);
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
index e278d07..12b52ec 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
@@ -27,7 +27,6 @@ import 
org.apache.flink.runtime.io.network.netty.NettyMessage.CancelPartitionReq
 import org.apache.flink.runtime.io.network.netty.NettyMessage.CloseRequest;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
-import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,10 +52,10 @@ class PartitionRequestServerHandler extends 
SimpleChannelInboundHandler<NettyMes
        private BufferPool bufferPool;
 
        PartitionRequestServerHandler(
-                       ResultPartitionProvider partitionProvider,
-                       TaskEventDispatcher taskEventDispatcher,
-                       PartitionRequestQueue outboundQueue,
-                       NetworkBufferPool networkBufferPool) {
+               ResultPartitionProvider partitionProvider,
+               TaskEventDispatcher taskEventDispatcher,
+               PartitionRequestQueue outboundQueue,
+               NetworkBufferPool networkBufferPool) {
 
                this.partitionProvider = partitionProvider;
                this.taskEventDispatcher = taskEventDispatcher;
@@ -94,15 +93,16 @@ class PartitionRequestServerHandler extends 
SimpleChannelInboundHandler<NettyMes
                                LOG.debug("Read channel on {}: {}.", 
ctx.channel().localAddress(), request);
 
                                try {
-                                       ResultSubpartitionView subpartition =
-                                                       
partitionProvider.createSubpartitionView(
-                                                                       
request.partitionId,
-                                                                       
request.queueIndex,
-                                                                       
bufferPool);
-
-                                       outboundQueue.enqueue(subpartition, 
request.receiverId);
-                               }
-                               catch (PartitionNotFoundException notFound) {
+                                       SequenceNumberingViewReader reader = 
new SequenceNumberingViewReader(
+                                               request.receiverId,
+                                               outboundQueue);
+
+                                       reader.requestSubpartitionView(
+                                               partitionProvider,
+                                               request.partitionId,
+                                               request.queueIndex,
+                                               bufferPool);
+                               } catch (PartitionNotFoundException notFound) {
                                        respondWithError(ctx, notFound, 
request.receiverId);
                                }
                        }
@@ -115,20 +115,16 @@ class PartitionRequestServerHandler extends 
SimpleChannelInboundHandler<NettyMes
                                if 
(!taskEventDispatcher.publish(request.partitionId, request.event)) {
                                        respondWithError(ctx, new 
IllegalArgumentException("Task event receiver not found."), request.receiverId);
                                }
-                       }
-                       else if (msgClazz == CancelPartitionRequest.class) {
+                       } else if (msgClazz == CancelPartitionRequest.class) {
                                CancelPartitionRequest request = 
(CancelPartitionRequest) msg;
 
                                outboundQueue.cancel(request.receiverId);
-                       }
-                       else if (msgClazz == CloseRequest.class) {
+                       } else if (msgClazz == CloseRequest.class) {
                                outboundQueue.close();
-                       }
-                       else {
+                       } else {
                                LOG.warn("Received unexpected client request: 
{}", msg);
                        }
-               }
-               catch (Throwable t) {
+               } catch (Throwable t) {
                        respondWithError(ctx, t);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
new file mode 100644
index 0000000..ef611eb
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
+import 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Simple wrapper for the partition readerQueue iterator, which increments a
+ * sequence number for each returned buffer and remembers the receiver ID.
+ *
+ * <p>It also keeps track of available buffers and notifies the outbound
+ * handler about non-emptiness, similar to the {@link LocalInputChannel}.
+ */
+class SequenceNumberingViewReader implements BufferAvailabilityListener {
+
+       private final Object requestLock = new Object();
+
+       private final InputChannelID receiverId;
+
+       private final AtomicLong numBuffersAvailable = new AtomicLong();
+
+       private final PartitionRequestQueue requestQueue;
+
+       private volatile ResultSubpartitionView subpartitionView;
+
+       private int sequenceNumber = -1;
+
+       SequenceNumberingViewReader(InputChannelID receiverId, 
PartitionRequestQueue requestQueue) {
+               this.receiverId = receiverId;
+               this.requestQueue = requestQueue;
+       }
+
+       void requestSubpartitionView(
+               ResultPartitionProvider partitionProvider,
+               ResultPartitionID resultPartitionId,
+               int subPartitionIndex,
+               BufferProvider bufferProvider) throws IOException {
+
+               synchronized (requestLock) {
+                       if (subpartitionView == null) {
+                               // This this call can trigger a notification we 
have to
+                               // schedule a separate task at the event loop 
that will
+                               // start consuming this. Otherwise the 
reference to the
+                               // view cannot be available in getNextBuffer().
+                               this.subpartitionView = 
partitionProvider.createSubpartitionView(
+                                       resultPartitionId,
+                                       subPartitionIndex,
+                                       bufferProvider,
+                                       this);
+                       } else {
+                               throw new IllegalStateException("Subpartition 
already requested");
+                       }
+               }
+       }
+
+       InputChannelID getReceiverId() {
+               return receiverId;
+       }
+
+       int getSequenceNumber() {
+               return sequenceNumber;
+       }
+
+       public BufferAndAvailability getNextBuffer() throws IOException, 
InterruptedException {
+               Buffer next = subpartitionView.getNextBuffer();
+               if (next != null) {
+                       long remaining = numBuffersAvailable.decrementAndGet();
+                       sequenceNumber++;
+
+                       if (remaining >= 0) {
+                               return new BufferAndAvailability(next, 
remaining > 0);
+                       } else {
+                               throw new IllegalStateException("no buffer 
available");
+                       }
+               } else {
+                       return null;
+               }
+       }
+
+       public void notifySubpartitionConsumed() throws IOException {
+               subpartitionView.notifySubpartitionConsumed();
+       }
+
+       public boolean isReleased() {
+               return subpartitionView.isReleased();
+       }
+
+       public Throwable getFailureCause() {
+               return subpartitionView.getFailureCause();
+       }
+
+       public void releaseAllResources() throws IOException {
+               subpartitionView.releaseAllResources();
+       }
+
+       @Override
+       public void notifyBuffersAvailable(long numBuffers) {
+               // if this request made the channel non-empty, notify the input 
gate
+               if (numBuffers > 0 && numBuffersAvailable.getAndAdd(numBuffers) 
== 0) {
+                       requestQueue.notifyReaderNonEmpty(this);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java
new file mode 100644
index 0000000..114ef7c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+/**
+ * Listener interface implemented by consumers of {@link 
ResultSubpartitionView}
+ * that want to be notified of availability of further buffers.
+ */
+public interface BufferAvailabilityListener {
+
+       /**
+        * Called whenever a new number of buffers becomes available.
+        *
+        * @param numBuffers The number of buffers that became available.
+        */
+       void notifyBuffersAvailable(long numBuffers);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index b703acb..4d5e378 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -22,7 +22,7 @@ import 
org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.util.event.NotificationListener;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.util.ArrayDeque;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A pipelined in-memory only subpartition, which can be consumed once.
@@ -38,51 +39,47 @@ class PipelinedSubpartition extends ResultSubpartition {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(PipelinedSubpartition.class);
 
+       // 
------------------------------------------------------------------------
+
+       /** All buffers of this subpartition. Access to the buffers is 
synchronized on this object. */
+       private final ArrayDeque<Buffer> buffers = new ArrayDeque<>();
+
+       /** The read view to consume this subpartition. */
+       private PipelinedSubpartitionView readView;
+
        /** Flag indicating whether the subpartition has been finished. */
        private boolean isFinished;
 
        /** Flag indicating whether the subpartition has been released. */
        private volatile boolean isReleased;
 
-       /**
-        * A data availability listener. Registered, when the consuming task is 
faster than the
-        * producing task.
-        */
-       private NotificationListener registeredListener;
-
-       /** The read view to consume this subpartition. */
-       private PipelinedSubpartitionView readView;
-
-       /** All buffers of this subpartition. Access to the buffers is 
synchronized on this object. */
-       final ArrayDeque<Buffer> buffers = new ArrayDeque<Buffer>();
+       // 
------------------------------------------------------------------------
 
        PipelinedSubpartition(int index, ResultPartition parent) {
                super(index, parent);
        }
 
        @Override
-       public boolean add(Buffer buffer) {
+       public boolean add(Buffer buffer) throws IOException {
                checkNotNull(buffer);
 
-               final NotificationListener listener;
+               // view reference accessible outside the lock, but assigned 
inside the locked scope
+               final PipelinedSubpartitionView reader;
 
                synchronized (buffers) {
-                       if (isReleased || isFinished) {
+                       if (isFinished || isReleased) {
                                return false;
                        }
 
                        // Add the buffer and update the stats
                        buffers.add(buffer);
+                       reader = readView;
                        updateStatistics(buffer);
-
-                       // Get the listener...
-                       listener = registeredListener;
-                       registeredListener = null;
                }
 
                // Notify the listener outside of the synchronized block
-               if (listener != null) {
-                       listener.onNotification();
+               if (reader != null) {
+                       reader.notifyBuffersAvailable(1);
                }
 
                return true;
@@ -90,36 +87,34 @@ class PipelinedSubpartition extends ResultSubpartition {
 
        @Override
        public void finish() throws IOException {
-               final NotificationListener listener;
+               final Buffer buffer = 
EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+
+               // view reference accessible outside the lock, but assigned 
inside the locked scope
+               final PipelinedSubpartitionView reader;
 
                synchronized (buffers) {
-                       if (isReleased || isFinished) {
+                       if (isFinished || isReleased) {
                                return;
                        }
 
-                       final Buffer buffer = 
EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
-
                        buffers.add(buffer);
+                       reader = readView;
                        updateStatistics(buffer);
 
                        isFinished = true;
-
-                       LOG.debug("Finished {}.", this);
-
-                       // Get the listener...
-                       listener = registeredListener;
-                       registeredListener = null;
                }
 
+               LOG.debug("Finished {}.", this);
+
                // Notify the listener outside of the synchronized block
-               if (listener != null) {
-                       listener.onNotification();
+               if (reader != null) {
+                       reader.notifyBuffersAvailable(1);
                }
        }
 
        @Override
        public void release() {
-               final NotificationListener listener;
+               // view reference accessible outside the lock, but assigned 
inside the locked scope
                final PipelinedSubpartitionView view;
 
                synchronized (buffers) {
@@ -130,40 +125,35 @@ class PipelinedSubpartition extends ResultSubpartition {
                        // Release all available buffers
                        Buffer buffer;
                        while ((buffer = buffers.poll()) != null) {
-                               if (!buffer.isRecycled()) {
-                                       buffer.recycle();
-                               }
+                               buffer.recycle();
                        }
 
                        // Get the view...
                        view = readView;
                        readView = null;
 
-                       // Get the listener...
-                       listener = registeredListener;
-                       registeredListener = null;
-
                        // Make sure that no further buffers are added to the 
subpartition
                        isReleased = true;
-
-                       LOG.debug("Released {}.", this);
                }
 
+               LOG.debug("Released {}.", this);
+
                // Release all resources of the view
                if (view != null) {
                        view.releaseAllResources();
                }
+       }
 
-               // Notify the listener outside of the synchronized block
-               if (listener != null) {
-                       listener.onNotification();
+       Buffer pollBuffer() {
+               synchronized (buffers) {
+                       return buffers.pollFirst();
                }
        }
 
        @Override
        public int releaseMemory() {
-               // The pipelined subpartition does not react to memory release 
requests. The buffers will be
-               // recycled by the consuming task.
+               // The pipelined subpartition does not react to memory release 
requests.
+               // The buffers will be recycled by the consuming task.
                return 0;
        }
 
@@ -173,52 +163,42 @@ class PipelinedSubpartition extends ResultSubpartition {
        }
 
        @Override
-       public PipelinedSubpartitionView createReadView(BufferProvider 
bufferProvider) {
-               synchronized (buffers) {
-                       if (readView != null) {
-                               throw new IllegalStateException("Subpartition " 
+ index + " of "
-                                               + parent.getPartitionId() + " 
is being or already has been " +
-                                               "consumed, but pipelined 
subpartitions can only be consumed once.");
-                       }
+       public PipelinedSubpartitionView createReadView(BufferProvider 
bufferProvider, BufferAvailabilityListener availabilityListener) throws 
IOException {
+               final int queueSize;
 
-                       readView = new PipelinedSubpartitionView(this);
+               synchronized (buffers) {
+                       checkState(!isReleased);
+                       checkState(readView == null,
+                                       "Subpartition %s of is being (or 
already has been) consumed, " +
+                                       "but pipelined subpartitions can only 
be consumed once.", index, parent.getPartitionId());
 
-                       LOG.debug("Created read view for subpartition {} of 
partition {}.", index, parent.getPartitionId());
+                       LOG.debug("Creating read view for subpartition {} of 
partition {}.", index, parent.getPartitionId());
 
-                       return readView;
+                       queueSize = buffers.size();
+                       readView = new PipelinedSubpartitionView(this, 
availabilityListener);
                }
+
+               readView.notifyBuffersAvailable(queueSize);
+
+               return readView;
        }
 
        @Override
        public String toString() {
-               synchronized (buffers) {
-                       return String.format("PipelinedSubpartition [number of 
buffers: %d (%d bytes), " +
-                                                       "finished? %s, read 
view? %s]",
-                                       getTotalNumberOfBuffers(), 
getTotalNumberOfBytes(), isFinished, readView != null);
-               }
-       }
+               final long numBuffers;
+               final long numBytes;
+               final boolean finished;
+               final boolean hasReadView;
 
-       /**
-        * Registers a listener with this subpartition and returns whether the 
registration was
-        * successful.
-        *
-        * <p> A registered listener is notified when the state of the 
subpartition changes. After a
-        * notification, the listener is unregistered. Only a single listener 
is allowed to be
-        * registered.
-        */
-       boolean registerListener(NotificationListener listener) {
                synchronized (buffers) {
-                       if (!buffers.isEmpty() || isReleased) {
-                               return false;
-                       }
-
-                       if (registeredListener == null) {
-                               registeredListener = listener;
-
-                               return true;
-                       }
-
-                       throw new IllegalStateException("Already registered 
listener.");
+                       numBuffers = getTotalNumberOfBuffers();
+                       numBytes = getTotalNumberOfBytes();
+                       finished = isFinished;
+                       hasReadView = readView != null;
                }
+
+               return String.format(
+                               "PipelinedSubpartition [number of buffers: %d 
(%d bytes), finished? %s, read view? %s]",
+                               numBuffers, numBytes, finished, hasReadView);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
index f8d81a4..52c78ea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.util.event.NotificationListener;
 
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -33,23 +33,25 @@ class PipelinedSubpartitionView implements 
ResultSubpartitionView {
        /** The subpartition this view belongs to. */
        private final PipelinedSubpartition parent;
 
+       private final BufferAvailabilityListener availabilityListener;
+
        /** Flag indicating whether this view has been released. */
-       private AtomicBoolean isReleased = new AtomicBoolean();
+       private final AtomicBoolean isReleased;
 
-       PipelinedSubpartitionView(PipelinedSubpartition parent) {
+       PipelinedSubpartitionView(PipelinedSubpartition parent, 
BufferAvailabilityListener listener) {
                this.parent = checkNotNull(parent);
+               this.availabilityListener = checkNotNull(listener);
+               this.isReleased = new AtomicBoolean();
        }
 
        @Override
        public Buffer getNextBuffer() {
-               synchronized (parent.buffers) {
-                       return parent.buffers.poll();
-               }
+               return parent.pollBuffer();
        }
 
        @Override
-       public boolean registerListener(NotificationListener listener) {
-               return !isReleased.get() && parent.registerListener(listener);
+       public void notifyBuffersAvailable(long numBuffers) throws IOException {
+               availabilityListener.notifyBuffersAvailable(numBuffers);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index c30f333..9da919e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
@@ -131,7 +130,6 @@ public class ResultPartition implements BufferPoolOwner {
                ResultPartitionManager partitionManager,
                ResultPartitionConsumableNotifier partitionConsumableNotifier,
                IOManager ioManager,
-               IOMode defaultIoMode,
                boolean sendScheduleOrUpdateConsumersMessage) {
 
                this.owningTaskName = checkNotNull(owningTaskName);
@@ -147,8 +145,7 @@ public class ResultPartition implements BufferPoolOwner {
                switch (partitionType) {
                        case BLOCKING:
                                for (int i = 0; i < subpartitions.length; i++) {
-                                       subpartitions[i] = new 
SpillableSubpartition(
-                                                       i, this, ioManager, 
defaultIoMode);
+                                       subpartitions[i] = new 
SpillableSubpartition(i, this, ioManager);
                                }
 
                                break;
@@ -321,7 +318,7 @@ public class ResultPartition implements BufferPoolOwner {
        /**
         * Returns the requested subpartition.
         */
-       public ResultSubpartitionView createSubpartitionView(int index, 
BufferProvider bufferProvider) throws IOException {
+       public ResultSubpartitionView createSubpartitionView(int index, 
BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) 
throws IOException {
                int refCnt = pendingReferences.get();
 
                checkState(refCnt != -1, "Partition released.");
@@ -329,7 +326,7 @@ public class ResultPartition implements BufferPoolOwner {
 
                checkElementIndex(index, subpartitions.length, "Subpartition 
not found.");
 
-               ResultSubpartitionView readView = 
subpartitions[index].createReadView(bufferProvider);
+               ResultSubpartitionView readView = 
subpartitions[index].createReadView(bufferProvider, availabilityListener);
 
                LOG.debug("Created {}", readView);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
index 9da3e14..8ad3e34 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
@@ -66,7 +66,8 @@ public class ResultPartitionManager implements 
ResultPartitionProvider {
        public ResultSubpartitionView createSubpartitionView(
                        ResultPartitionID partitionId,
                        int subpartitionIndex,
-                       BufferProvider bufferProvider) throws IOException {
+                       BufferProvider bufferProvider,
+                       BufferAvailabilityListener availabilityListener) throws 
IOException {
 
                synchronized (registeredPartitions) {
                        final ResultPartition partition = 
registeredPartitions.get(partitionId.getProducerId(),
@@ -78,7 +79,7 @@ public class ResultPartitionManager implements 
ResultPartitionProvider {
 
                        LOG.debug("Requesting subpartition {} of {}.", 
subpartitionIndex, partition);
 
-                       return 
partition.createSubpartitionView(subpartitionIndex, bufferProvider);
+                       return 
partition.createSubpartitionView(subpartitionIndex, bufferProvider, 
availabilityListener);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
index 23dd1d3..3fbfd49 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
@@ -30,6 +30,7 @@ public interface ResultPartitionProvider {
        ResultSubpartitionView createSubpartitionView(
                        ResultPartitionID partitionId,
                        int index,
-                       BufferProvider bufferProvider) throws IOException;
+                       BufferProvider bufferProvider,
+                       BufferAvailabilityListener availabilityListener) throws 
IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index b7ca9c4..0fafe79 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -37,7 +37,7 @@ public abstract class ResultSubpartition {
        // - Statistics 
----------------------------------------------------------
 
        /** The total number of buffers (both data and event buffers) */
-       private int totalNumberOfBuffers;
+       private long totalNumberOfBuffers;
 
        /** The total number of bytes (both data and event buffers) */
        private long totalNumberOfBytes;
@@ -52,7 +52,7 @@ public abstract class ResultSubpartition {
                totalNumberOfBytes += buffer.getSize();
        }
 
-       protected int getTotalNumberOfBuffers() {
+       protected long getTotalNumberOfBuffers() {
                return totalNumberOfBuffers;
        }
 
@@ -77,7 +77,7 @@ public abstract class ResultSubpartition {
 
        abstract public void release() throws IOException;
 
-       abstract public ResultSubpartitionView createReadView(BufferProvider 
bufferProvider) throws IOException;
+       abstract public ResultSubpartitionView createReadView(BufferProvider 
bufferProvider, BufferAvailabilityListener availabilityListener) throws 
IOException;
 
        abstract int releaseMemory() throws IOException;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
index cfc5455..98be90f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.util.event.NotificationListener;
 
 import java.io.IOException;
 
@@ -41,13 +40,7 @@ public interface ResultSubpartitionView {
         */
        Buffer getNextBuffer() throws IOException, InterruptedException;
 
-       /**
-        * Subscribes to data availability notifications.
-        * <p>
-        * Returns whether the subscription was successful. A subscription 
fails,
-        * if there is data available.
-        */
-       boolean registerListener(NotificationListener listener) throws 
IOException;
+       void notifyBuffersAvailable(long buffers) throws IOException;
 
        void releaseAllResources() throws IOException;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 3e4692a..b584ebb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -18,42 +18,54 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.ArrayDeque;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A blocking in-memory subpartition, which is able to spill to disk.
+ * A spillable sub partition starts out in-memory and spills to disk if asked
+ * to do so.
  *
- * <p> Buffers are kept in-memory as long as possible. If not possible 
anymore, all buffers are
- * spilled to disk.
+ * <p>Buffers for the partition come from a {@link BufferPool}. The buffer pool
+ * is also responsible to trigger the release of the buffers if it needs them
+ * back. At this point, the spillable sub partition will write all in-memory
+ * buffers to disk. All added buffers after that point directly go to disk.
+ *
+ * <p>This partition type is used for {@link ResultPartitionType#BLOCKING}
+ * results that are fully produced before they can be consumed. At the point
+ * when they are consumed, the buffers are (i) all in-memory, (ii) currently
+ * being spilled to disk, or (iii) completely spilled to disk. Depending on
+ * this state, different reader variants are returned (see
+ * {@link SpillableSubpartitionView} and {@link SpilledSubpartitionView}).
+ *
+ * <p>Since the network buffer pool size is usually quite small (default is
+ * {@link ConfigConstants#DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS}), most
+ * spillable partitions will be spilled for real-world data sets.
  */
 class SpillableSubpartition extends ResultSubpartition {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(SpillableSubpartition.class);
 
-       /** All buffers of this subpartition. */
-       final ArrayList<Buffer> buffers = new ArrayList<Buffer>();
+       /** Buffers are kept in this queue as long as we weren't ask to release 
any. */
+       private final ArrayDeque<Buffer> buffers = new ArrayDeque<>();
 
-       /** The I/O manager to create the spill writer from. */
-       final IOManager ioManager;
-
-       /** The default I/O mode to use. */
-       final IOMode ioMode;
+       /** The I/O manager used for spilling buffers to disk. */
+       private final IOManager ioManager;
 
        /** The writer used for spilling. As long as this is null, we are 
in-memory. */
-       BufferFileWriter spillWriter;
+       private BufferFileWriter spillWriter;
 
        /** Flag indicating whether the subpartition has been finished. */
        private boolean isFinished;
@@ -64,11 +76,10 @@ class SpillableSubpartition extends ResultSubpartition {
        /** The read view to consume this subpartition. */
        private ResultSubpartitionView readView;
 
-       SpillableSubpartition(int index, ResultPartition parent, IOManager 
ioManager, IOMode ioMode) {
+       SpillableSubpartition(int index, ResultPartition parent, IOManager 
ioManager) {
                super(index, parent);
 
                this.ioManager = checkNotNull(ioManager);
-               this.ioMode = checkNotNull(ioMode);
        }
 
        @Override
@@ -80,7 +91,6 @@ class SpillableSubpartition extends ResultSubpartition {
                                return false;
                        }
 
-                       // In-memory
                        if (spillWriter == null) {
                                buffers.add(buffer);
 
@@ -88,7 +98,7 @@ class SpillableSubpartition extends ResultSubpartition {
                        }
                }
 
-               // Else: Spilling
+               // Didn't return early => go to disk
                spillWriter.writeBlock(buffer);
 
                return true;
@@ -102,7 +112,7 @@ class SpillableSubpartition extends ResultSubpartition {
                        }
                }
 
-               // If we are spilling/have spilled, wait for the writer to 
finish.
+               // If we are spilling/have spilled, wait for the writer to 
finish
                if (spillWriter != null) {
                        spillWriter.close();
                }
@@ -117,51 +127,93 @@ class SpillableSubpartition extends ResultSubpartition {
                                return;
                        }
 
-                       // Recycle all in-memory buffers
-                       for (Buffer buffer : buffers) {
-                               buffer.recycle();
-                       }
-
-                       buffers.clear();
-                       buffers.trimToSize();
+                       view = readView;
 
-                       // If we are spilling/have spilled, wait for the writer 
to finish and delete the file.
-                       if (spillWriter != null) {
-                               spillWriter.closeAndDelete();
+                       // No consumer yet, we are responsible to clean 
everything up. If
+                       // one is available, the view is responsible is to 
clean up (see
+                       // below).
+                       if (view == null) {
+                               for (Buffer buffer : buffers) {
+                                       buffer.recycle();
+                               }
+                               buffers.clear();
+
+                               // TODO This can block until all buffers are 
written out to
+                               // disk if a spill is in-progress before 
deleting the file.
+                               // It is possibly called from the Netty event 
loop threads,
+                               // which can bring down the network.
+                               if (spillWriter != null) {
+                                       spillWriter.closeAndDelete();
+                               }
                        }
 
-                       // Get the view...
-                       view = readView;
-                       readView = null;
-
                        isReleased = true;
                }
 
-               // Release the view outside of the synchronized block
                if (view != null) {
-                       view.notifySubpartitionConsumed();
+                       view.releaseAllResources();
+               }
+       }
+
+       @Override
+       public ResultSubpartitionView createReadView(BufferProvider 
bufferProvider, BufferAvailabilityListener availabilityListener) throws 
IOException {
+               synchronized (buffers) {
+                       if (!isFinished) {
+                               throw new IllegalStateException("Subpartition 
has not been finished yet, " +
+                                       "but blocking subpartitions can only be 
consumed after they have " +
+                                       "been finished.");
+                       }
+
+                       if (readView != null) {
+                               throw new IllegalStateException("Subpartition 
is being or already has been " +
+                                       "consumed, but we currently allow 
subpartitions to only be consumed once.");
+                       }
+
+                       if (spillWriter != null) {
+                               readView = new SpilledSubpartitionView(
+                                       this,
+                                       bufferProvider.getMemorySegmentSize(),
+                                       spillWriter,
+                                       getTotalNumberOfBuffers(),
+                                       availabilityListener);
+                       } else {
+                               readView = new SpillableSubpartitionView(
+                                       this,
+                                       buffers,
+                                       ioManager,
+                                       bufferProvider.getMemorySegmentSize(),
+                                       availabilityListener);
+                       }
+
+                       return readView;
                }
        }
 
        @Override
        public int releaseMemory() throws IOException {
                synchronized (buffers) {
-                       if (spillWriter == null) {
-                               // Create the spill writer
+                       ResultSubpartitionView view = readView;
+
+                       if (view != null && view.getClass() == 
SpillableSubpartitionView.class) {
+                               // If there is a spilalble view, it's the 
responsibility of the
+                               // view to release memory.
+                               SpillableSubpartitionView spillableView = 
(SpillableSubpartitionView) view;
+                               return spillableView.releaseMemory();
+                       } else if (spillWriter == null) {
+                               // No view and in-memory => spill to disk
                                spillWriter = 
ioManager.createBufferFileWriter(ioManager.createChannel());
 
-                               final int numberOfBuffers = buffers.size();
-
+                               int numberOfBuffers = buffers.size();
                                long spilledBytes = 0;
 
                                // Spill all buffers
                                for (int i = 0; i < numberOfBuffers; i++) {
-                                       Buffer buffer = buffers.remove(0);
+                                       Buffer buffer = buffers.remove();
                                        spilledBytes += buffer.getSize();
                                        spillWriter.writeBlock(buffer);
                                }
 
-                               LOG.debug("Spilled {} bytes for sub partition 
{} of {}.", spilledBytes, index, parent.getPartitionId());
+                               LOG.debug("Spilling {} bytes for sub partition 
{} of {}.", spilledBytes, index, parent.getPartitionId());
 
                                return numberOfBuffers;
                        }
@@ -177,54 +229,11 @@ class SpillableSubpartition extends ResultSubpartition {
        }
 
        @Override
-       public ResultSubpartitionView createReadView(BufferProvider 
bufferProvider) throws IOException {
-               synchronized (buffers) {
-                       if (!isFinished) {
-                               throw new IllegalStateException("Subpartition 
has not been finished yet, " +
-                                               "but blocking subpartitions can 
only be consumed after they have " +
-                                               "been finished.");
-                       }
-
-                       if (readView != null) {
-                               throw new IllegalStateException("Subpartition 
is being or already has been " +
-                                               "consumed, but we currently 
allow subpartitions to only be consumed once.");
-                       }
-
-                       // Spilled if closed and no outstanding write requests
-                       boolean isSpilled = spillWriter != null && 
(spillWriter.isClosed()
-                                       || 
spillWriter.getNumberOfOutstandingRequests() == 0);
-
-                       if (isSpilled) {
-                               if (ioMode.isSynchronous()) {
-                                       readView = new 
SpilledSubpartitionViewSyncIO(
-                                                       this,
-                                                       
bufferProvider.getMemorySegmentSize(),
-                                                       
spillWriter.getChannelID(),
-                                                       0);
-                               }
-                               else {
-                                       readView = new 
SpilledSubpartitionViewAsyncIO(
-                                                       this,
-                                                       bufferProvider,
-                                                       ioManager,
-                                                       
spillWriter.getChannelID(),
-                                                       0);
-                               }
-                       }
-                       else {
-                               readView = new SpillableSubpartitionView(
-                                               this, bufferProvider, 
buffers.size(), ioMode);
-                       }
-
-                       return readView;
-               }
-       }
-
-       @Override
        public String toString() {
                return String.format("SpillableSubpartition [%d number of 
buffers (%d bytes)," +
                                                "finished? %s, read view? %s, 
spilled? %s]",
                                getTotalNumberOfBuffers(), 
getTotalNumberOfBytes(), isFinished, readView != null,
                                spillWriter != null);
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index 29c2002..8119ecc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -18,15 +18,14 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.util.event.NotificationListener;
 
 import java.io.IOException;
+import java.util.ArrayDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 class SpillableSubpartitionView implements ResultSubpartitionView {
@@ -34,146 +33,163 @@ class SpillableSubpartitionView implements 
ResultSubpartitionView {
        /** The subpartition this view belongs to. */
        private final SpillableSubpartition parent;
 
-       /** The buffer provider to read buffers into (spilling case). */
-       private final BufferProvider bufferProvider;
+       /** All buffers of this subpartition. Access to the buffers is 
synchronized on this object. */
+       private final ArrayDeque<Buffer> buffers;
 
-       /** The number of buffers in-memory at the subpartition. */
-       private final int numberOfBuffers;
+       /** IO manager if we need to spill (for spilled case). */
+       private final IOManager ioManager;
 
-       /** The default I/O mode to use. */
-       private final IOMode ioMode;
+       /** Size of memory segments (for spilled case). */
+       private final int memorySegmentSize;
 
-       private ResultSubpartitionView spilledView;
-
-       private int currentQueuePosition;
-
-       private long currentBytesRead;
+       /**
+        * The buffer availability listener. As long as in-memory, notifications
+        * happen on a buffer per buffer basis as spilling may happen after a
+        * notification has been sent out.
+        */
+       private final BufferAvailabilityListener listener;
 
        private final AtomicBoolean isReleased = new AtomicBoolean(false);
 
-       public SpillableSubpartitionView(
-                       SpillableSubpartition parent,
-                       BufferProvider bufferProvider,
-                       int numberOfBuffers,
-                       IOMode ioMode) {
-
-               this.parent = checkNotNull(parent);
-               this.bufferProvider = checkNotNull(bufferProvider);
-               checkArgument(numberOfBuffers >= 0);
-               this.numberOfBuffers = numberOfBuffers;
-               this.ioMode = checkNotNull(ioMode);
-       }
-
-       @Override
-       public Buffer getNextBuffer() throws IOException, InterruptedException {
-
-               if (isReleased.get()) {
-                       return null;
-               }
-
-               // 1) In-memory
-               synchronized (parent.buffers) {
-                       if (parent.isReleased()) {
-                               return null;
-                       }
+       /**
+        * The next buffer to hand out. Everytime this is set to a non-null 
value,
+        * a listener notification happens.
+        */
+       private Buffer nextBuffer;
 
-                       if (parent.spillWriter == null) {
-                               if (currentQueuePosition < numberOfBuffers) {
-                                       Buffer buffer = 
parent.buffers.get(currentQueuePosition);
+       private volatile SpilledSubpartitionView spilledView;
 
-                                       buffer.retain();
+       SpillableSubpartitionView(
+               SpillableSubpartition parent,
+               ArrayDeque<Buffer> buffers,
+               IOManager ioManager,
+               int memorySegmentSize,
+               BufferAvailabilityListener listener) {
 
-                                       // TODO Fix hard coding of 8 bytes for 
the header
-                                       currentBytesRead += buffer.getSize() + 
8;
-                                       currentQueuePosition++;
-
-                                       return buffer;
-                               }
+               this.parent = checkNotNull(parent);
+               this.buffers = checkNotNull(buffers);
+               this.ioManager = checkNotNull(ioManager);
+               this.memorySegmentSize = memorySegmentSize;
+               this.listener = checkNotNull(listener);
 
-                               return null;
-                       }
+               synchronized (buffers) {
+                       nextBuffer = buffers.poll();
                }
 
-               // 2) Spilled
-               if (spilledView != null) {
-                       return spilledView.getNextBuffer();
+               if (nextBuffer != null) {
+                       listener.notifyBuffersAvailable(1);
                }
+       }
 
-               // 3) Spilling
-               // Make sure that all buffers are written before consuming 
them. We can't block here,
-               // because this might be called from an network I/O thread.
-               if (parent.spillWriter.getNumberOfOutstandingRequests() > 0) {
-                       return null;
-               }
+       int releaseMemory() throws IOException {
+               synchronized (buffers) {
+                       if (spilledView != null || nextBuffer == null) {
+                               // Already spilled or nothing in-memory
+                               return 0;
+                       } else {
+                               // We don't touch next buffer, because a 
notification has
+                               // already been sent for it. Only when it is 
consumed, will
+                               // it be recycled.
+
+                               // Create the spill writer and write all 
buffers to disk
+                               BufferFileWriter spillWriter = 
ioManager.createBufferFileWriter(ioManager.createChannel());
+
+                               int numBuffers = buffers.size();
+                               for (int i = 0; i < numBuffers; i++) {
+                                       Buffer buffer = buffers.remove();
+                                       try {
+                                               spillWriter.writeBlock(buffer);
+                                       } finally {
+                                               buffer.recycle();
+                                       }
+                               }
 
-               if (ioMode.isSynchronous()) {
-                       spilledView = new SpilledSubpartitionViewSyncIO(
-                                       parent,
-                                       bufferProvider.getMemorySegmentSize(),
-                                       parent.spillWriter.getChannelID(),
-                                       currentBytesRead);
-               }
-               else {
-                       spilledView = new SpilledSubpartitionViewAsyncIO(
+                               spilledView = new SpilledSubpartitionView(
                                        parent,
-                                       bufferProvider,
-                                       parent.ioManager,
-                                       parent.spillWriter.getChannelID(),
-                                       currentBytesRead);
-               }
+                                       memorySegmentSize,
+                                       spillWriter,
+                                       numBuffers,
+                                       listener);
 
-               return spilledView.getNextBuffer();
+                               return numBuffers;
+                       }
+               }
        }
 
        @Override
-       public boolean registerListener(NotificationListener listener) throws 
IOException {
-               if (spilledView == null) {
-                       synchronized (parent.buffers) {
-                               // Didn't spill yet, buffers should be in-memory
-                               if (parent.spillWriter == null) {
-                                       return false;
+       public Buffer getNextBuffer() throws IOException, InterruptedException {
+               synchronized (buffers) {
+                       if (isReleased.get()) {
+                               return null;
+                       } else if (nextBuffer != null) {
+                               Buffer current = nextBuffer;
+                               nextBuffer = buffers.poll();
+
+                               if (nextBuffer != null) {
+                                       listener.notifyBuffersAvailable(1);
                                }
-                       }
 
-                       // Spilling
-                       if (parent.spillWriter.getNumberOfOutstandingRequests() 
> 0) {
-                               return 
parent.spillWriter.registerAllRequestsProcessedListener(listener);
+                               return current;
                        }
+               } // else: spilled
 
-                       return false;
+               SpilledSubpartitionView spilled = spilledView;
+               if (spilled != null) {
+                       return spilled.getNextBuffer();
+               } else {
+                       throw new IllegalStateException("No in-memory buffers 
available, but also nothing spilled.");
                }
-
-               return spilledView.registerListener(listener);
        }
 
        @Override
-       public void notifySubpartitionConsumed() throws IOException {
-               parent.onConsumedSubpartition();
+       public void notifyBuffersAvailable(long buffers) throws IOException {
+               // We do the availability listener notification one by one
        }
 
        @Override
        public void releaseAllResources() throws IOException {
                if (isReleased.compareAndSet(false, true)) {
-                       if (spilledView != null) {
-                               spilledView.releaseAllResources();
+                       SpilledSubpartitionView spilled = spilledView;
+                       if (spilled != null) {
+                               spilled.releaseAllResources();
                        }
                }
        }
 
        @Override
+       public void notifySubpartitionConsumed() throws IOException {
+               SpilledSubpartitionView spilled = spilledView;
+               if (spilled != null) {
+                       spilled.notifySubpartitionConsumed();
+               } else {
+                       parent.onConsumedSubpartition();
+               }
+       }
+
+       @Override
        public boolean isReleased() {
-               return parent.isReleased() || isReleased.get();
+               SpilledSubpartitionView spilled = spilledView;
+               if (spilled != null) {
+                       return spilled.isReleased();
+               } else {
+                       return parent.isReleased() || isReleased.get();
+               }
        }
 
        @Override
        public Throwable getFailureCause() {
-               return parent.getFailureCause();
+               SpilledSubpartitionView spilled = spilledView;
+               if (spilled != null) {
+                       return spilled.getFailureCause();
+               } else {
+                       return parent.getFailureCause();
+               }
        }
 
        @Override
        public String toString() {
                return String.format("SpillableSubpartitionView(index: %d) of 
ResultPartition %s",
-                               parent.index,
-                               parent.parent.getPartitionId());
+                       parent.index,
+                       parent.parent.getPartitionId());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
new file mode 100644
index 0000000..b087a4e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
+import org.apache.flink.runtime.io.disk.iomanager.SynchronousBufferFileReader;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.util.event.NotificationListener;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Reader for a spilled sub partition.
+ *
+ * <p>The partition availability listener is notified about available buffers
+ * only when the spilling is done. Spilling is done async and if it is still
+ * in progress, we wait with the notification until the spilling is done.
+ *
+ * <p>Reads of the spilled file are done in synchronously.
+ */
+class SpilledSubpartitionView implements ResultSubpartitionView, 
NotificationListener {
+
+       /** The subpartition this view belongs to. */
+       private final ResultSubpartition parent;
+
+       /** Writer for spills. */
+       private final BufferFileWriter spillWriter;
+
+       /** The synchronous file reader to do the actual I/O. */
+       private final BufferFileReader fileReader;
+
+       /** The buffer pool to read data into. */
+       private final SpillReadBufferPool bufferPool;
+
+       /** Buffer availability listener. */
+       private final BufferAvailabilityListener availabilityListener;
+
+       /** The total number of spilled buffers. */
+       private final long numberOfSpilledBuffers;
+
+       /** Flag indicating whether all resources have been released. */
+       private AtomicBoolean isReleased = new AtomicBoolean();
+
+       /** Flag indicating whether a spill is still in progress. */
+       private volatile boolean isSpillInProgress = true;
+
+       SpilledSubpartitionView(
+               ResultSubpartition parent,
+               int memorySegmentSize,
+               BufferFileWriter spillWriter,
+               long numberOfSpilledBuffers,
+               BufferAvailabilityListener availabilityListener) throws 
IOException {
+
+               this.parent = checkNotNull(parent);
+               this.bufferPool = new SpillReadBufferPool(2, memorySegmentSize);
+               this.spillWriter = checkNotNull(spillWriter);
+               this.fileReader = new 
SynchronousBufferFileReader(spillWriter.getChannelID(), false);
+               checkArgument(numberOfSpilledBuffers >= 0);
+               this.numberOfSpilledBuffers = numberOfSpilledBuffers;
+               this.availabilityListener = checkNotNull(availabilityListener);
+
+               // Check whether async spilling is still in progress. If not, 
this returns
+               // false and we can notify our availability listener about all 
available buffers.
+               // Otherwise, we notify only when the spill writer callback 
happens.
+               if (!spillWriter.registerAllRequestsProcessedListener(this)) {
+                       isSpillInProgress = false;
+                       
availabilityListener.notifyBuffersAvailable(numberOfSpilledBuffers);
+               }
+       }
+
+       /**
+        * This is the call back method for the spill writer. If a spill is 
still
+        * in progress when this view is created we wait until this method is 
called
+        * before we notify the availability listener.
+        */
+       @Override
+       public void onNotification() {
+               isSpillInProgress = false;
+               
availabilityListener.notifyBuffersAvailable(numberOfSpilledBuffers);
+       }
+
+       @Override
+       public Buffer getNextBuffer() throws IOException, InterruptedException {
+               if (fileReader.hasReachedEndOfFile() || isSpillInProgress) {
+                       return null;
+               }
+
+               // TODO This is fragile as we implicitly expect that multiple 
calls to
+               // this method don't happen before recycling buffers returned 
earlier.
+               Buffer buffer = bufferPool.requestBufferBlocking();
+               fileReader.readInto(buffer);
+
+               return buffer;
+       }
+
+       @Override
+       public void notifyBuffersAvailable(long buffers) throws IOException {
+               // We do the availability listener notification either directly 
on
+               // construction of this view (when everything has been spilled) 
or
+               // as soon as spilling is done and we are notified about it in 
the
+               // #onNotification callback.
+       }
+
+       @Override
+       public void notifySubpartitionConsumed() throws IOException {
+               parent.onConsumedSubpartition();
+       }
+
+       @Override
+       public void releaseAllResources() throws IOException {
+               if (isReleased.compareAndSet(false, true)) {
+                       // TODO This can block until all buffers are written 
out to
+                       // disk if a spill is in-progress before deleting the 
file.
+                       // It is possibly called from the Netty event loop 
threads,
+                       // which can bring down the network.
+                       spillWriter.closeAndDelete();
+
+                       fileReader.close();
+                       bufferPool.destroy();
+               }
+       }
+
+       @Override
+       public boolean isReleased() {
+               return parent.isReleased() || isReleased.get();
+       }
+
+       @Override
+       public Throwable getFailureCause() {
+               return parent.getFailureCause();
+       }
+
+       @Override
+       public String toString() {
+               return String.format("SpilledSubpartitionView[sync](index: %d) 
of ResultPartition %s", parent.index, parent.parent.getPartitionId());
+       }
+
+       /**
+        * A buffer pool to provide buffer to read the file into.
+        *
+        * <p>This pool ensures that a consuming input gate makes progress in 
all cases, even when all
+        * buffers of the input gate buffer pool have been requested by remote 
input channels.
+        */
+       private static class SpillReadBufferPool implements BufferRecycler {
+
+               private final Queue<Buffer> buffers;
+
+               private boolean isDestroyed;
+
+               SpillReadBufferPool(int numberOfBuffers, int memorySegmentSize) 
{
+                       this.buffers = new ArrayDeque<>(numberOfBuffers);
+
+                       synchronized (buffers) {
+                               for (int i = 0; i < numberOfBuffers; i++) {
+                                       buffers.add(new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(memorySegmentSize), this));
+                               }
+                       }
+               }
+
+               @Override
+               public void recycle(MemorySegment memorySegment) {
+                       synchronized (buffers) {
+                               if (isDestroyed) {
+                                       memorySegment.free();
+                               } else {
+                                       buffers.add(new Buffer(memorySegment, 
this));
+                                       buffers.notifyAll();
+                               }
+                       }
+               }
+
+               private Buffer requestBufferBlocking() throws 
InterruptedException {
+                       synchronized (buffers) {
+                               while (true) {
+                                       if (isDestroyed) {
+                                               return null;
+                                       }
+
+                                       Buffer buffer = buffers.poll();
+
+                                       if (buffer != null) {
+                                               return buffer;
+                                       }
+                                       // Else: wait for a buffer
+                                       buffers.wait();
+                               }
+                       }
+               }
+
+               private void destroy() {
+                       synchronized (buffers) {
+                               isDestroyed = true;
+                               buffers.notifyAll();
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
deleted file mode 100644
index daccd28..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
+++ /dev/null
@@ -1,383 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
-import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.util.event.NotificationListener;
-
-import java.io.IOException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * View over a spilled subpartition.
- *
- * <p> Reads are triggered asynchronously in batches of configurable size.
- */
-class SpilledSubpartitionViewAsyncIO implements ResultSubpartitionView {
-
-       private final static int DEFAULT_READ_BATCH_SIZE = 2;
-
-       private final Object lock = new Object();
-
-       /** The subpartition this view belongs to. */
-       private final ResultSubpartition parent;
-
-       /** The buffer provider to get the buffer read everything into. */
-       private final BufferProvider bufferProvider;
-
-       /** The buffer availability listener to be notified on available 
buffers. */
-       private final BufferProviderCallback bufferAvailabilityListener;
-
-       /** The size of read batches. */
-       private final int readBatchSize;
-
-       /**
-        * The size of the current batch (>= 0 and <= the configured batch 
size). Reads are only
-        * triggered when the size of the current batch is 0.
-        */
-       private final AtomicInteger currentBatchSize = new AtomicInteger();
-
-       /** The asynchronous file reader to do the actual I/O. */
-       private final BufferFileReader asyncFileReader;
-
-       /** The buffers, which have been returned from the file reader. */
-       private final ConcurrentLinkedQueue<Buffer> returnedBuffers = new 
ConcurrentLinkedQueue<Buffer>();
-
-       /** A data availability listener. */
-       private NotificationListener registeredListener;
-
-       /** Error, which has occurred in the I/O thread. */
-       private volatile IOException errorInIOThread;
-
-       /** Flag indicating whether all resources have been released. */
-       private volatile boolean isReleased;
-
-       /** Flag indicating whether we reached EOF at the file reader. */
-       private volatile boolean hasReachedEndOfFile;
-
-       /** Spilled file size */
-       private final long fileSize;
-
-       SpilledSubpartitionViewAsyncIO(
-                       ResultSubpartition parent,
-                       BufferProvider bufferProvider,
-                       IOManager ioManager,
-                       FileIOChannel.ID channelId,
-                       long initialSeekPosition) throws IOException {
-
-               this(parent, bufferProvider, ioManager, channelId, 
initialSeekPosition, DEFAULT_READ_BATCH_SIZE);
-       }
-
-       SpilledSubpartitionViewAsyncIO(
-                       ResultSubpartition parent,
-                       BufferProvider bufferProvider,
-                       IOManager ioManager,
-                       FileIOChannel.ID channelId,
-                       long initialSeekPosition,
-                       int readBatchSize) throws IOException {
-
-               checkArgument(initialSeekPosition >= 0, "Initial seek position 
is < 0.");
-               checkArgument(readBatchSize >= 1, "Batch read size < 1.");
-
-               this.parent = checkNotNull(parent);
-               this.bufferProvider = checkNotNull(bufferProvider);
-               this.bufferAvailabilityListener = new 
BufferProviderCallback(this);
-
-               this.asyncFileReader = 
ioManager.createBufferFileReader(channelId, new IOThreadCallback(this));
-
-               if (initialSeekPosition > 0) {
-                       asyncFileReader.seekToPosition(initialSeekPosition);
-               }
-
-               this.readBatchSize = readBatchSize;
-
-               this.fileSize = asyncFileReader.getSize();
-
-               // Trigger the initial read requests
-               readNextBatchAsync();
-       }
-
-       @Override
-       public Buffer getNextBuffer() throws IOException {
-               checkError();
-
-               final Buffer buffer = returnedBuffers.poll();
-
-               // No buffer returned from the I/O thread currently. Either the 
current batch is in progress
-               // or we trigger the next one.
-               if (buffer == null) {
-                       if (currentBatchSize.get() == 0) {
-                               readNextBatchAsync();
-                       }
-               }
-               else {
-                       currentBatchSize.decrementAndGet();
-               }
-
-               return buffer;
-       }
-
-       @Override
-       public boolean registerListener(NotificationListener listener) throws 
IOException {
-               checkNotNull(listener);
-
-               checkError();
-
-               synchronized (lock) {
-                       if (isReleased || !returnedBuffers.isEmpty()) {
-                               return false;
-                       }
-
-                       if (registeredListener == null) {
-                               registeredListener = listener;
-
-                               return true;
-                       }
-               }
-
-               throw new IllegalStateException("Already registered listener.");
-       }
-
-       @Override
-       public void notifySubpartitionConsumed() throws IOException {
-               parent.onConsumedSubpartition();
-       }
-
-       @Override
-       public void releaseAllResources() throws IOException {
-               try {
-                       synchronized (lock) {
-                               if (!isReleased) {
-                                       // Recycle all buffers. Buffers, which 
are in flight are recycled as soon as
-                                       // they return from the I/O thread.
-                                       Buffer buffer;
-                                       while ((buffer = 
returnedBuffers.poll()) != null) {
-                                               buffer.recycle();
-                                       }
-
-                                       isReleased = true;
-                               }
-                       }
-               }
-               finally {
-                       asyncFileReader.close();
-               }
-       }
-
-       @Override
-       public boolean isReleased() {
-               return parent.isReleased() || isReleased;
-       }
-
-       @Override
-       public Throwable getFailureCause() {
-               return parent.getFailureCause();
-       }
-
-       /**
-        * Requests buffers from the buffer provider and triggers asynchronous 
read requests to fill
-        * them.
-        *
-        * <p> The number of requested buffers/triggered I/O read requests per 
call depends on the
-        * configured size of batch reads.
-        */
-       private void readNextBatchAsync() throws IOException {
-               // This does not need to be fully synchronized with actually 
reaching EOF as long as
-               // we eventually notice it. In the worst case, we trigger some 
discarded reads and
-               // notice it when the buffers are returned.
-               //
-               // We only trigger reads if the current batch size is 0.
-               if (hasReachedEndOfFile || currentBatchSize.get() != 0) {
-                       return;
-               }
-
-               // Number of successful buffer requests or callback 
registrations. The call back will
-               // trigger the read as soon as a buffer becomes available again.
-               int i = 0;
-
-               while (i < readBatchSize) {
-                       final Buffer buffer = bufferProvider.requestBuffer();
-
-                       if (buffer == null) {
-                               // Listen for buffer availability.
-                               currentBatchSize.incrementAndGet();
-
-                               if 
(bufferProvider.addListener(bufferAvailabilityListener)) {
-                                       i++;
-                               }
-                               else if (bufferProvider.isDestroyed()) {
-                                       currentBatchSize.decrementAndGet();
-                                       return;
-                               }
-                               else {
-                                       // Buffer available again
-                                       currentBatchSize.decrementAndGet();
-                               }
-                       }
-                       else {
-                               currentBatchSize.incrementAndGet();
-
-                               asyncFileReader.readInto(buffer);
-                       }
-               }
-       }
-
-       /**
-        * Returns a buffer from the buffer provider.
-        *
-        * <p> Note: This method is called from the thread recycling the 
available buffer.
-        */
-       private void onAvailableBuffer(Buffer buffer) {
-               try {
-                       asyncFileReader.readInto(buffer);
-               }
-               catch (IOException e) {
-                       notifyError(e);
-               }
-       }
-
-       /**
-        * Returns a successful buffer read request.
-        *
-        * <p> Note: This method is always called from the same I/O thread.
-        */
-       private void returnBufferFromIOThread(Buffer buffer) {
-               final NotificationListener listener;
-
-               synchronized (lock) {
-                       if (hasReachedEndOfFile || isReleased) {
-                               buffer.recycle();
-
-                               return;
-                       }
-
-                       returnedBuffers.add(buffer);
-
-                       listener = registeredListener;
-                       registeredListener = null;
-
-                       // If this was the last buffer before we reached EOF, 
set the corresponding flag to
-                       // ensure that further buffers are correctly recycled 
and eventually no further reads
-                       // are triggered.
-                       if (asyncFileReader.hasReachedEndOfFile()) {
-                               hasReachedEndOfFile = true;
-                       }
-               }
-
-               if (listener != null) {
-                       listener.onNotification();
-               }
-       }
-
-       /**
-        * Notifies the view about an error.
-        */
-       private void notifyError(IOException error) {
-               if (errorInIOThread == null) {
-                       errorInIOThread = error;
-               }
-
-               final NotificationListener listener;
-
-               synchronized (lock) {
-                       listener = registeredListener;
-                       registeredListener = null;
-               }
-
-               if (listener != null) {
-                       listener.onNotification();
-               }
-       }
-
-       /**
-        * Checks whether an error has been reported and rethrow the respective 
Exception, if available.
-        */
-       private void checkError() throws IOException {
-               if (errorInIOThread != null) {
-                       throw errorInIOThread;
-               }
-       }
-
-       /**
-        * Callback from the I/O thread.
-        *
-        * <p> Successful buffer read requests add the buffer to the 
subpartition view, and failed ones
-        * notify about the error.
-        */
-       private static class IOThreadCallback implements 
RequestDoneCallback<Buffer> {
-
-               private final SpilledSubpartitionViewAsyncIO subpartitionView;
-
-               public IOThreadCallback(SpilledSubpartitionViewAsyncIO 
subpartitionView) {
-                       this.subpartitionView = subpartitionView;
-               }
-
-               @Override
-               public void requestSuccessful(Buffer buffer) {
-                       subpartitionView.returnBufferFromIOThread(buffer);
-               }
-
-               @Override
-               public void requestFailed(Buffer buffer, IOException error) {
-                       // Recycle the buffer and forward the error
-                       buffer.recycle();
-
-                       subpartitionView.notifyError(error);
-               }
-       }
-
-       @Override
-       public String toString() {
-               return String.format("SpilledSubpartitionView[async](index: %d, 
file size: %d bytes) of ResultPartition %s",
-                               parent.index,
-                               fileSize,
-                               parent.parent.getPartitionId());
-       }
-
-       /**
-        * Callback from the buffer provider.
-        */
-       private static class BufferProviderCallback implements 
EventListener<Buffer> {
-
-               private final SpilledSubpartitionViewAsyncIO subpartitionView;
-
-               private BufferProviderCallback(SpilledSubpartitionViewAsyncIO 
subpartitionView) {
-                       this.subpartitionView = subpartitionView;
-               }
-
-               @Override
-               public void onEvent(Buffer buffer) {
-                       if (buffer == null) {
-                               return;
-                       }
-
-                       subpartitionView.onAvailableBuffer(buffer);
-               }
-       }
-}

Reply via email to