http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java
deleted file mode 100644
index c86697f..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
-import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
-import org.apache.flink.runtime.io.disk.iomanager.SynchronousBufferFileReader;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-import org.apache.flink.runtime.util.event.NotificationListener;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * View over a spilled subpartition.
- *
- * <p> Reads are done synchronously.
- */
-class SpilledSubpartitionViewSyncIO implements ResultSubpartitionView {
-
-       /** The subpartition this view belongs to. */
-       private final ResultSubpartition parent;
-
-       /** The synchronous file reader to do the actual I/O. */
-       private final BufferFileReader fileReader;
-
-       /** The buffer pool to read data into. */
-       private final SpillReadBufferPool bufferPool;
-
-       /** Flag indicating whether all resources have been released. */
-       private AtomicBoolean isReleased = new AtomicBoolean();
-
-       /** Spilled file size */
-       private final long fileSize;
-
-       SpilledSubpartitionViewSyncIO(
-                       ResultSubpartition parent,
-                       int memorySegmentSize,
-                       FileIOChannel.ID channelId,
-                       long initialSeekPosition) throws IOException {
-
-               checkArgument(initialSeekPosition >= 0, "Initial seek position 
is < 0.");
-
-               this.parent = checkNotNull(parent);
-
-               this.bufferPool = new SpillReadBufferPool(2, memorySegmentSize);
-
-               this.fileReader = new SynchronousBufferFileReader(channelId, 
false);
-
-               if (initialSeekPosition > 0) {
-                       fileReader.seekToPosition(initialSeekPosition);
-               }
-
-               this.fileSize = fileReader.getSize();
-       }
-
-       @Override
-       public Buffer getNextBuffer() throws IOException, InterruptedException {
-
-               if (fileReader.hasReachedEndOfFile()) {
-                       return null;
-               }
-
-               // It's OK to request the buffer in a blocking fashion as the 
buffer pool is NOT shared
-               // among all consumed subpartitions.
-               final Buffer buffer = bufferPool.requestBufferBlocking();
-
-               fileReader.readInto(buffer);
-
-               return buffer;
-       }
-
-       @Override
-       public boolean registerListener(NotificationListener listener) throws 
IOException {
-               return false;
-       }
-
-       @Override
-       public void notifySubpartitionConsumed() throws IOException {
-               parent.onConsumedSubpartition();
-       }
-
-       @Override
-       public void releaseAllResources() throws IOException {
-               if (isReleased.compareAndSet(false, true)) {
-                       fileReader.close();
-                       bufferPool.destroy();
-               }
-       }
-
-       @Override
-       public boolean isReleased() {
-               return parent.isReleased() || isReleased.get();
-       }
-
-       @Override
-       public Throwable getFailureCause() {
-               return parent.getFailureCause();
-       }
-
-       @Override
-       public String toString() {
-               return String.format("SpilledSubpartitionView[sync](index: %d, 
file size: %d bytes) of ResultPartition %s",
-                               parent.index,
-                               fileSize,
-                               parent.parent.getPartitionId());
-       }
-
-       /**
-        * A buffer pool to provide buffer to read the file into.
-        *
-        * <p> This pool ensures that a consuming input gate makes progress in 
all cases, even when all
-        * buffers of the input gate buffer pool have been requested by remote 
input channels.
-        *
-        * TODO Replace with asynchronous buffer pool request as this 
introduces extra buffers per
-        * consumed subpartition.
-        */
-       private static class SpillReadBufferPool implements BufferRecycler {
-
-               private final Queue<Buffer> buffers;
-
-               private boolean isDestroyed;
-
-               public SpillReadBufferPool(int numberOfBuffers, int 
memorySegmentSize) {
-                       this.buffers = new ArrayDeque<Buffer>(numberOfBuffers);
-
-                       synchronized (buffers) {
-                               for (int i = 0; i < numberOfBuffers; i++) {
-                                       buffers.add(new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(memorySegmentSize), this));
-                               }
-                       }
-               }
-
-               @Override
-               public void recycle(MemorySegment memorySegment) {
-                       synchronized (buffers) {
-                               if (isDestroyed) {
-                                       memorySegment.free();
-                               }
-                               else {
-                                       buffers.add(new Buffer(memorySegment, 
this));
-                                       buffers.notifyAll();
-                               }
-                       }
-               }
-
-               private Buffer requestBufferBlocking() throws 
InterruptedException {
-                       synchronized (buffers) {
-                               while (true) {
-                                       if (isDestroyed) {
-                                               return null;
-                                       }
-
-                                       Buffer buffer = buffers.poll();
-
-                                       if (buffer != null) {
-                                               return buffer;
-                                       }
-                                       // Else: wait for a buffer
-                                       buffers.wait();
-                               }
-                       }
-               }
-
-               private void destroy() {
-                       synchronized (buffers) {
-                               isDestroyed = true;
-                               buffers.notifyAll();
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
index 885e738..3e93ae6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
@@ -34,18 +34,35 @@ public class BufferOrEvent {
 
        private final AbstractEvent event;
 
+       /**
+        * Indicate availability of further instances for the union input gate.
+        * This is not needed outside of the input gate unioning logic and 
cannot
+        * be set outside of the consumer package.
+        */
+       private final boolean moreAvailable;
+
        private int channelIndex;
 
-       public BufferOrEvent(Buffer buffer, int channelIndex) {
+       BufferOrEvent(Buffer buffer, int channelIndex, boolean moreAvailable) {
                this.buffer = checkNotNull(buffer);
                this.event = null;
                this.channelIndex = channelIndex;
+               this.moreAvailable = moreAvailable;
        }
 
-       public BufferOrEvent(AbstractEvent event, int channelIndex) {
+       BufferOrEvent(AbstractEvent event, int channelIndex, boolean 
moreAvailable) {
                this.buffer = null;
                this.event = checkNotNull(event);
                this.channelIndex = channelIndex;
+               this.moreAvailable = moreAvailable;
+       }
+
+       public BufferOrEvent(Buffer buffer, int channelIndex) {
+               this(buffer, channelIndex, true);
+       }
+
+       public BufferOrEvent(AbstractEvent event, int channelIndex) {
+               this(event, channelIndex, true);
        }
 
        public boolean isBuffer() {
@@ -73,6 +90,10 @@ public class BufferOrEvent {
                this.channelIndex = channelIndex;
        }
 
+       boolean moreAvailable() {
+               return moreAvailable;
+       }
+
        @Override
        public String toString() {
                return String.format("BufferOrEvent [%s, channelIndex = %d]",

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 5d82903..a48bfaf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -101,10 +101,19 @@ public abstract class InputChannel {
        }
 
        /**
-        * Notifies the owning {@link SingleInputGate} about an available 
{@link Buffer} instance.
+        * Notifies the owning {@link SingleInputGate} that this channel became 
non-empty.
+        * 
+        * <p>This is guaranteed to be called only when a Buffer was added to a 
previously
+        * empty input channel. The notion of empty is atomically consistent 
with the flag
+        * {@link BufferAndAvailability#moreAvailable()} when polling the next 
buffer
+        * from this channel.
+        * 
+        * <p><b>Note:</b> When the input channel observes an exception, this
+        * method is called regardless of whether the channel was empty before. 
That ensures
+        * that the parent InputGate will always be notified about the 
exception.
         */
-       protected void notifyAvailableBuffer() {
-               inputGate.onAvailableBuffer(this);
+       protected void notifyChannelNonEmpty() {
+               inputGate.notifyChannelNonEmpty(this);
        }
 
        // 
------------------------------------------------------------------------
@@ -123,7 +132,7 @@ public abstract class InputChannel {
        /**
         * Returns the next buffer from the consumed subpartition.
         */
-       abstract Buffer getNextBuffer() throws IOException, 
InterruptedException;
+       abstract BufferAndAvailability getNextBuffer() throws IOException, 
InterruptedException;
 
        // 
------------------------------------------------------------------------
        // Task events
@@ -182,7 +191,7 @@ public abstract class InputChannel {
        protected void setError(Throwable cause) {
                if (this.cause.compareAndSet(null, checkNotNull(cause))) {
                        // Notify the input gate.
-                       notifyAvailableBuffer();
+                       notifyChannelNonEmpty();
                }
        }
 
@@ -225,4 +234,28 @@ public abstract class InputChannel {
                // Reached maximum backoff
                return false;
        }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * A combination of a {@link Buffer} and a flag indicating availability 
of further buffers.
+        */
+       public static final class BufferAndAvailability {
+
+               private final Buffer buffer;
+               private final boolean moreAvailable;
+
+               public BufferAndAvailability(Buffer buffer, boolean 
moreAvailable) {
+                       this.buffer = checkNotNull(buffer);
+                       this.moreAvailable = moreAvailable;
+               }
+
+               public Buffer buffer() {
+                       return buffer;
+               }
+
+               public boolean moreAvailable() {
+                       return moreAvailable;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 1cd5fc5..1f2182e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.util.event.EventListener;
 
 import java.io.IOException;
 
@@ -77,7 +76,7 @@ public interface InputGate {
 
        void sendTaskEvent(TaskEvent event) throws IOException;
 
-       void registerListener(EventListener<InputGate> listener);
+       void registerListener(InputGateListener listener);
 
        int getPageSize();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateListener.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateListener.java
new file mode 100644
index 0000000..00fa782
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateListener.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+/**
+ * Listener interface implemented by consumers of {@link InputGate} instances
+ * that want to be notified of availability of buffer or event instances.
+ */
+public interface InputGateListener {
+
+       /**
+        * Notification callback if the input gate moves from zero to non-zero
+        * available input channels with data.
+        *
+        * @param inputGate Input Gate that became available.
+        */
+       void notifyInputGateNonEmpty(InputGate inputGate);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 6fcd2f9..b34dbff 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -18,18 +18,16 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
-import org.apache.flink.runtime.util.event.NotificationListener;
+import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Tuple2;
@@ -37,6 +35,7 @@ import scala.Tuple2;
 import java.io.IOException;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -44,11 +43,13 @@ import static 
org.apache.flink.util.Preconditions.checkState;
 /**
  * An input channel, which requests a local subpartition.
  */
-public class LocalInputChannel extends InputChannel implements 
NotificationListener {
+public class LocalInputChannel extends InputChannel implements 
BufferAvailabilityListener {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(LocalInputChannel.class);
 
-       private final Object requestLock = new Object();
+       // 
------------------------------------------------------------------------
+
+       private final Object requestReleaseLock = new Object();
 
        /** The local partition manager. */
        private final ResultPartitionManager partitionManager;
@@ -56,38 +57,40 @@ public class LocalInputChannel extends InputChannel 
implements NotificationListe
        /** Task event dispatcher for backwards events. */
        private final TaskEventDispatcher taskEventDispatcher;
 
+       /** Number of available buffers used to keep track of non-empty gate 
notifications. */
+       private final AtomicLong numBuffersAvailable;
+
        /** The consumed subpartition */
        private volatile ResultSubpartitionView subpartitionView;
 
        private volatile boolean isReleased;
 
-       private volatile Buffer lookAhead;
-
        LocalInputChannel(
-                       SingleInputGate inputGate,
-                       int channelIndex,
-                       ResultPartitionID partitionId,
-                       ResultPartitionManager partitionManager,
-                       TaskEventDispatcher taskEventDispatcher,
-                       IOMetricGroup metrics) {
+               SingleInputGate inputGate,
+               int channelIndex,
+               ResultPartitionID partitionId,
+               ResultPartitionManager partitionManager,
+               TaskEventDispatcher taskEventDispatcher,
+               IOMetricGroup metrics) {
 
                this(inputGate, channelIndex, partitionId, partitionManager, 
taskEventDispatcher,
-                               new Tuple2<Integer, Integer>(0, 0), metrics);
+                       new Tuple2<Integer, Integer>(0, 0), metrics);
        }
 
        LocalInputChannel(
-                       SingleInputGate inputGate,
-                       int channelIndex,
-                       ResultPartitionID partitionId,
-                       ResultPartitionManager partitionManager,
-                       TaskEventDispatcher taskEventDispatcher,
-                       Tuple2<Integer, Integer> initialAndMaxBackoff,
-                       IOMetricGroup metrics) {
+               SingleInputGate inputGate,
+               int channelIndex,
+               ResultPartitionID partitionId,
+               ResultPartitionManager partitionManager,
+               TaskEventDispatcher taskEventDispatcher,
+               Tuple2<Integer, Integer> initialAndMaxBackoff,
+               IOMetricGroup metrics) {
 
                super(inputGate, channelIndex, partitionId, 
initialAndMaxBackoff, metrics.getNumBytesInLocalCounter());
 
                this.partitionManager = checkNotNull(partitionManager);
                this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
+               this.numBuffersAvailable = new AtomicLong();
        }
 
        // 
------------------------------------------------------------------------
@@ -97,30 +100,36 @@ public class LocalInputChannel extends InputChannel 
implements NotificationListe
        @Override
        void requestSubpartition(int subpartitionIndex) throws IOException, 
InterruptedException {
                // The lock is required to request only once in the presence of 
retriggered requests.
-               synchronized (requestLock) {
+               synchronized (requestReleaseLock) {
+                       checkState(!isReleased, "released");
+
                        if (subpartitionView == null) {
                                LOG.debug("{}: Requesting LOCAL subpartition {} 
of partition {}.",
-                                               this, subpartitionIndex, 
partitionId);
+                                       this, subpartitionIndex, partitionId);
 
                                try {
-                                       subpartitionView = 
partitionManager.createSubpartitionView(
-                                                       partitionId, 
subpartitionIndex, inputGate.getBufferProvider());
-                               }
-                               catch (PartitionNotFoundException notFound) {
+                                       ResultSubpartitionView subpartitionView 
= partitionManager.createSubpartitionView(
+                                               partitionId, subpartitionIndex, 
inputGate.getBufferProvider(), this);
+
+                                       if (subpartitionView == null) {
+                                               throw new IOException("Error 
requesting subpartition.");
+                                       }
+
+                                       // make the subpartition view visible
+                                       this.subpartitionView = 
subpartitionView;
+
+                                       // check if the channel was released in 
the meantime
+                                       if (isReleased) {
+                                               
subpartitionView.releaseAllResources();
+                                               this.subpartitionView = null;
+                                       }
+                               } catch (PartitionNotFoundException notFound) {
                                        if (increaseBackoff()) {
                                                
inputGate.retriggerPartitionRequest(partitionId.getPartitionId());
-                                               return;
-                                       }
-                                       else {
+                                       } else {
                                                throw notFound;
                                        }
                                }
-
-                               if (subpartitionView == null) {
-                                       throw new IOException("Error requesting 
subpartition.");
-                               }
-
-                               getNextLookAhead();
                        }
                }
        }
@@ -128,17 +137,16 @@ public class LocalInputChannel extends InputChannel 
implements NotificationListe
        /**
         * Retriggers a subpartition request.
         */
-       void retriggerSubpartitionRequest(Timer timer, final int 
subpartitionIndex) throws IOException, InterruptedException {
-               synchronized (requestLock) {
-                       checkState(subpartitionView == null, "Already requested 
partition.");
+       void retriggerSubpartitionRequest(Timer timer, final int 
subpartitionIndex) {
+               synchronized (requestReleaseLock) {
+                       checkState(subpartitionView == null, "already requested 
partition");
 
                        timer.schedule(new TimerTask() {
                                @Override
                                public void run() {
                                        try {
                                                
requestSubpartition(subpartitionIndex);
-                                       }
-                                       catch (Throwable t) {
+                                       } catch (Throwable t) {
                                                setError(t);
                                        }
                                }
@@ -147,29 +155,49 @@ public class LocalInputChannel extends InputChannel 
implements NotificationListe
        }
 
        @Override
-       Buffer getNextBuffer() throws IOException, InterruptedException {
+       BufferAndAvailability getNextBuffer() throws IOException, 
InterruptedException {
                checkError();
-               checkState(subpartitionView != null, "Queried for a buffer 
before requesting the subpartition.");
 
-               // After subscribe notification
-               if (lookAhead == null) {
-                       lookAhead = subpartitionView.getNextBuffer();
+               ResultSubpartitionView subpartitionView = this.subpartitionView;
+               if (subpartitionView == null) {
+                       // this can happen if the request for the partition was 
triggered asynchronously
+                       // by the time trigger
+                       // would be good to avoid that, by guaranteeing that 
the requestPartition() and
+                       // getNextBuffer() always come from the same thread
+                       // we could do that by letting the timer insert a 
special "requesting channel" into the input gate's queue
+                       subpartitionView = checkAndWaitForSubpartitionView();
                }
 
-               Buffer next = lookAhead;
-               lookAhead = null;
+               Buffer next = subpartitionView.getNextBuffer();
+               long remaining = numBuffersAvailable.decrementAndGet();
 
-               if (!next.isBuffer() && EventSerializer
-                               .fromBuffer(next, getClass().getClassLoader())
-                               .getClass() == EndOfPartitionEvent.class) {
-
-                               return next;
+               if (remaining >= 0) {
+                       numBytesIn.inc(next.getSize());
+                       return new BufferAndAvailability(next, remaining > 0);
+               } else if (subpartitionView.isReleased()) {
+                       throw new 
ProducerFailedException(subpartitionView.getFailureCause());
+               } else {
+                       throw new IllegalStateException("No buffer available 
and producer partition not released.");
                }
+       }
 
-               getNextLookAhead();
+       @Override
+       public void notifyBuffersAvailable(long numBuffers) {
+               // if this request made the channel non-empty, notify the input 
gate
+               if (numBuffers > 0 && numBuffersAvailable.getAndAdd(numBuffers) 
== 0) {
+                       notifyChannelNonEmpty();
+               }
+       }
 
-               numBytesIn.inc(next.getSize());
-               return next;
+       private ResultSubpartitionView checkAndWaitForSubpartitionView() {
+               // synchronizing on the request lock means this blocks until 
the asynchronous request
+               // for the partition view has been completed
+               // by then the subpartition view is visible or the channel is 
released
+               synchronized (requestReleaseLock) {
+                       checkState(!isReleased, "released");
+                       checkState(subpartitionView != null, "Queried for a 
buffer before requesting the subpartition.");
+                       return subpartitionView;
+               }
        }
 
        // 
------------------------------------------------------------------------
@@ -208,18 +236,15 @@ public class LocalInputChannel extends InputChannel 
implements NotificationListe
         */
        @Override
        void releaseAllResources() throws IOException {
-               if (!isReleased) {
-                       if (lookAhead != null) {
-                               lookAhead.recycle();
-                               lookAhead = null;
-                       }
+               synchronized (requestReleaseLock) {
+                       if (!isReleased) {
+                               isReleased = true;
 
-                       if (subpartitionView != null) {
-                               subpartitionView.releaseAllResources();
-                               subpartitionView = null;
+                               if (subpartitionView != null) {
+                                       subpartitionView.releaseAllResources();
+                                       subpartitionView = null;
+                               }
                        }
-
-                       isReleased = true;
                }
        }
 
@@ -227,55 +252,4 @@ public class LocalInputChannel extends InputChannel 
implements NotificationListe
        public String toString() {
                return "LocalInputChannel [" + partitionId + "]";
        }
-
-       // 
------------------------------------------------------------------------
-       // Queue iterator listener (called by producing or disk I/O thread)
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void onNotification() {
-               if (isReleased) {
-                       return;
-               }
-
-               try {
-                       getNextLookAhead();
-               }
-               catch (Exception e) {
-                       throw new RuntimeException(e);
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       private void getNextLookAhead() throws IOException, 
InterruptedException {
-
-               final ResultSubpartitionView view = subpartitionView;
-
-               if (view == null) {
-                       return;
-               }
-
-               while (true) {
-                       lookAhead = view.getNextBuffer();
-
-                       if (lookAhead != null) {
-                               notifyAvailableBuffer();
-                               break;
-                       }
-
-                       if (view.registerListener(this)) {
-                               return;
-                       }
-                       else if (view.isReleased()) {
-                               Throwable cause = view.getFailureCause();
-
-                               if (cause != null) {
-                                       setError(new 
ProducerFailedException(cause));
-                               }
-
-                               return;
-                       }
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 1cd042c..4dc159b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
@@ -27,8 +26,7 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
 import scala.Tuple2;
 
 import java.io.IOException;
@@ -44,8 +42,6 @@ import static org.apache.flink.util.Preconditions.checkState;
  */
 public class RemoteInputChannel extends InputChannel {
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(RemoteInputChannel.class);
-
        /** ID to distinguish this channel from other channels sharing the same 
TCP connection. */
        private final InputChannelID id = new InputChannelID();
 
@@ -77,25 +73,25 @@ public class RemoteInputChannel extends InputChannel {
        private int expectedSequenceNumber = 0;
 
        public RemoteInputChannel(
-                       SingleInputGate inputGate,
-                       int channelIndex,
-                       ResultPartitionID partitionId,
-                       ConnectionID connectionId,
-                       ConnectionManager connectionManager,
-                       IOMetricGroup metrics) {
+               SingleInputGate inputGate,
+               int channelIndex,
+               ResultPartitionID partitionId,
+               ConnectionID connectionId,
+               ConnectionManager connectionManager,
+               IOMetricGroup metrics) {
 
                this(inputGate, channelIndex, partitionId, connectionId, 
connectionManager,
-                               new Tuple2<Integer, Integer>(0, 0), metrics);
+                       new Tuple2<Integer, Integer>(0, 0), metrics);
        }
 
        public RemoteInputChannel(
-                       SingleInputGate inputGate,
-                       int channelIndex,
-                       ResultPartitionID partitionId,
-                       ConnectionID connectionId,
-                       ConnectionManager connectionManager,
-                       Tuple2<Integer, Integer> initialAndMaxBackoff,
-                       IOMetricGroup metrics) {
+               SingleInputGate inputGate,
+               int channelIndex,
+               ResultPartitionID partitionId,
+               ConnectionID connectionId,
+               ConnectionManager connectionManager,
+               Tuple2<Integer, Integer> initialAndMaxBackoff,
+               IOMetricGroup metrics) {
 
                super(inputGate, channelIndex, partitionId, 
initialAndMaxBackoff, metrics.getNumBytesInRemoteCounter());
 
@@ -115,7 +111,7 @@ public class RemoteInputChannel extends InputChannel {
                if (partitionRequestClient == null) {
                        // Create a client and request the partition
                        partitionRequestClient = connectionManager
-                                       
.createPartitionRequestClient(connectionId);
+                               .createPartitionRequestClient(connectionId);
 
                        partitionRequestClient.requestSubpartition(partitionId, 
subpartitionIndex, this, 0);
                }
@@ -129,31 +125,29 @@ public class RemoteInputChannel extends InputChannel {
 
                if (increaseBackoff()) {
                        partitionRequestClient.requestSubpartition(
-                                       partitionId, subpartitionIndex, this, 
getCurrentBackoff());
-               }
-               else {
+                               partitionId, subpartitionIndex, this, 
getCurrentBackoff());
+               } else {
                        failPartitionRequest();
                }
        }
 
        @Override
-       Buffer getNextBuffer() throws IOException {
+       BufferAndAvailability getNextBuffer() throws IOException {
                checkState(!isReleased.get(), "Queried for a buffer after 
channel has been closed.");
                checkState(partitionRequestClient != null, "Queried for a 
buffer before requesting a queue.");
 
                checkError();
 
-               synchronized (receivedBuffers) {
-                       Buffer buffer = receivedBuffers.poll();
-
-                       // Sanity check that channel is only queried after a 
notification
-                       if (buffer == null) {
-                               throw new IOException("Queried input channel 
for data although non is available.");
-                       }
+               final Buffer next;
+               final int remaining;
 
-                       numBytesIn.inc(buffer.getSize());
-                       return buffer;
+               synchronized (receivedBuffers) {
+                       next = receivedBuffers.poll();
+                       remaining = receivedBuffers.size();
                }
+
+               numBytesIn.inc(next.getSize());
+               return new BufferAndAvailability(next, remaining > 0);
        }
 
        // 
------------------------------------------------------------------------
@@ -201,8 +195,7 @@ public class RemoteInputChannel extends InputChannel {
                        // buffers received concurrently with closing are 
properly recycled.
                        if (partitionRequestClient != null) {
                                partitionRequestClient.close(this);
-                       }
-                       else {
+                       } else {
                                
connectionManager.closeOpenChannelConnections(connectionId);
                        }
                }
@@ -246,20 +239,22 @@ public class RemoteInputChannel extends InputChannel {
                        synchronized (receivedBuffers) {
                                if (!isReleased.get()) {
                                        if (expectedSequenceNumber == 
sequenceNumber) {
+                                               int available = 
receivedBuffers.size();
+
                                                receivedBuffers.add(buffer);
                                                expectedSequenceNumber++;
 
-                                               notifyAvailableBuffer();
+                                               if (available == 0) {
+                                                       notifyChannelNonEmpty();
+                                               }
 
                                                success = true;
-                                       }
-                                       else {
+                                       } else {
                                                onError(new 
BufferReorderingException(expectedSequenceNumber, sequenceNumber));
                                        }
                                }
                        }
-               }
-               finally {
+               } finally {
                        if (!success) {
                                buffer.recycle();
                        }
@@ -271,8 +266,7 @@ public class RemoteInputChannel extends InputChannel {
                        if (!isReleased.get()) {
                                if (expectedSequenceNumber == sequenceNumber) {
                                        expectedSequenceNumber++;
-                               }
-                               else {
+                               } else {
                                        onError(new 
BufferReorderingException(expectedSequenceNumber, sequenceNumber));
                                }
                        }
@@ -303,7 +297,7 @@ public class RemoteInputChannel extends InputChannel {
                @Override
                public String getMessage() {
                        return String.format("Buffer re-ordering: expected 
buffer with sequence number %d, but received %d.",
-                                       expectedSequenceNumber, 
actualSequenceNumber);
+                               expectedSequenceNumber, actualSequenceNumber);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 212aade..105d28b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -21,7 +21,6 @@ package 
org.apache.flink.runtime.io.network.partition.consumer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
@@ -36,23 +35,22 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Timer;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -137,7 +135,7 @@ public class SingleInputGate implements InputGate {
        private final Map<IntermediateResultPartitionID, InputChannel> 
inputChannels;
 
        /** Channels, which notified this input gate about available data. */
-       private final BlockingQueue<InputChannel> inputChannelsWithData = new 
LinkedBlockingQueue<InputChannel>();
+       private final ArrayDeque<InputChannel> inputChannelsWithData = new 
ArrayDeque<>();
 
        private final BitSet channelsWithEndOfPartitionEvents;
 
@@ -159,9 +157,9 @@ public class SingleInputGate implements InputGate {
        private volatile boolean isReleased;
 
        /** Registered listener to forward buffer notifications to. */
-       private volatile EventListener<InputGate> registeredListener;
+       private volatile InputGateListener inputGateListener;
 
-       private final List<TaskEvent> pendingEvents = new 
ArrayList<TaskEvent>();
+       private final List<TaskEvent> pendingEvents = new ArrayList<>();
 
        private int numberOfUninitializedChannels;
 
@@ -213,6 +211,10 @@ public class SingleInputGate implements InputGate {
                return bufferPool;
        }
 
+       public BufferPool getBufferPool() {
+               return bufferPool;
+       }
+
        @Override
        public int getPageSize() {
                if (bufferPool != null) {
@@ -240,7 +242,7 @@ public class SingleInputGate implements InputGate {
                this.bufferPool = checkNotNull(bufferPool);
        }
 
-       public void setInputChannel(IntermediateResultPartitionID partitionId, 
InputChannel inputChannel) {
+       void setInputChannel(IntermediateResultPartitionID partitionId, 
InputChannel inputChannel) {
                synchronized (requestLock) {
                        if (inputChannels.put(checkNotNull(partitionId), 
checkNotNull(inputChannel)) == null
                                        && inputChannel.getClass() == 
UnknownInputChannel.class) {
@@ -332,6 +334,7 @@ public class SingleInputGate implements InputGate {
        }
 
        public void releaseAllResources() throws IOException {
+               boolean released = false;
                synchronized (requestLock) {
                        if (!isReleased) {
                                try {
@@ -358,9 +361,16 @@ public class SingleInputGate implements InputGate {
                                }
                                finally {
                                        isReleased = true;
+                                       released = true;
                                }
                        }
                }
+
+               if (released) {
+                       synchronized (inputChannelsWithData) {
+                               inputChannelsWithData.notifyAll();
+                       }
+               }
        }
 
        @Override
@@ -406,32 +416,50 @@ public class SingleInputGate implements InputGate {
 
        @Override
        public BufferOrEvent getNextBufferOrEvent() throws IOException, 
InterruptedException {
-
                if (hasReceivedAllEndOfPartitionEvents) {
                        return null;
                }
 
+               if (isReleased) {
+                       throw new IllegalStateException("Released");
+               }
+
                requestPartitions();
 
-               InputChannel currentChannel = null;
-               while (currentChannel == null) {
-                       if (isReleased) {
-                               throw new IllegalStateException("Released");
+               InputChannel currentChannel;
+               boolean moreAvailable;
+
+               synchronized (inputChannelsWithData) {
+                       while (inputChannelsWithData.size() == 0) {
+                               if (isReleased) {
+                                       throw new 
IllegalStateException("Released");
+                               }
+
+                               inputChannelsWithData.wait();
                        }
 
-                       currentChannel = inputChannelsWithData.poll(2, 
TimeUnit.SECONDS);
+                       currentChannel = inputChannelsWithData.remove();
+                       moreAvailable = inputChannelsWithData.size() > 0;
                }
 
-               final Buffer buffer = currentChannel.getNextBuffer();
+               final BufferAndAvailability result = 
currentChannel.getNextBuffer();
 
                // Sanity check that notifications only happen when data is 
available
-               if (buffer == null) {
+               if (result == null) {
                        throw new IllegalStateException("Bug in input 
gate/channel logic: input gate got " +
                                        "notified by channel about available 
data, but none was available.");
                }
 
+               // this channel was now removed from the non-empty channels 
queue
+               // we re-add it in case it has more data, because in that case 
no "non-empty" notification
+               // will come for that channel
+               if (result.moreAvailable()) {
+                       queueChannel(currentChannel);
+               }
+
+               final Buffer buffer = result.buffer();
                if (buffer.isBuffer()) {
-                       return new BufferOrEvent(buffer, 
currentChannel.getChannelIndex());
+                       return new BufferOrEvent(buffer, 
currentChannel.getChannelIndex(), moreAvailable);
                }
                else {
                        final AbstractEvent event = 
EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
@@ -448,7 +476,7 @@ public class SingleInputGate implements InputGate {
                                currentChannel.releaseAllResources();
                        }
 
-                       return new BufferOrEvent(event, 
currentChannel.getChannelIndex());
+                       return new BufferOrEvent(event, 
currentChannel.getChannelIndex(), moreAvailable);
                }
        }
 
@@ -470,21 +498,16 @@ public class SingleInputGate implements InputGate {
        // 
------------------------------------------------------------------------
 
        @Override
-       public void registerListener(EventListener<InputGate> listener) {
-               if (registeredListener == null) {
-                       registeredListener = listener;
-               }
-               else {
+       public void registerListener(InputGateListener inputGateListener) {
+               if (this.inputGateListener == null) {
+                       this.inputGateListener = inputGateListener;
+               } else {
                        throw new IllegalStateException("Multiple listeners");
                }
        }
 
-       public void onAvailableBuffer(InputChannel channel) {
-               inputChannelsWithData.add(channel);
-               EventListener<InputGate> listener = registeredListener;
-               if (listener != null) {
-                       listener.onEvent(this);
-               }
+       void notifyChannelNonEmpty(InputChannel channel) {
+               queueChannel(checkNotNull(channel));
        }
 
        void triggerPartitionStateCheck(ResultPartitionID partitionId) {
@@ -495,6 +518,27 @@ public class SingleInputGate implements InputGate {
                                partitionId);
        }
 
+       private void queueChannel(InputChannel channel) {
+               int availableChannels;
+
+               synchronized (inputChannelsWithData) {
+                       availableChannels = inputChannelsWithData.size();
+
+                       inputChannelsWithData.add(channel);
+
+                       if (availableChannels == 0) {
+                               inputChannelsWithData.notify();
+                       }
+               }
+
+               if (availableChannels == 0) {
+                       InputGateListener listener = inputGateListener;
+                       if (listener != null) {
+                               listener.notifyInputGateNonEmpty(this);
+                       }
+               }
+       }
+
        // 
------------------------------------------------------------------------
 
        @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index b1b8911..e8ccbb4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -22,15 +22,11 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.util.event.EventListener;
 
 import java.io.IOException;
-import java.util.List;
+import java.util.ArrayDeque;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.LinkedBlockingQueue;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -63,19 +59,22 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *
  * It is possible to recursively union union input gates.
  */
-public class UnionInputGate implements InputGate {
+public class UnionInputGate implements InputGate, InputGateListener {
 
        /** The input gates to union. */
        private final InputGate[] inputGates;
 
        private final Set<InputGate> inputGatesWithRemainingData;
 
-       /** Data availability listener across all unioned input gates. */
-       private final InputGateListener inputGateListener;
+       /** Gates, which notified this input gate about available data. */
+       private final ArrayDeque<InputGate> inputGatesWithData = new 
ArrayDeque<>();
 
        /** The total number of input channels across all unioned input gates. 
*/
        private final int totalNumberOfInputChannels;
 
+       /** Registered listener to forward input gate notifications to. */
+       private volatile InputGateListener inputGateListener;
+
        /**
         * A mapping from input gate to (logical) channel index offset. Valid 
channel indexes go from 0
         * (inclusive) to the total number of input channels (exclusive).
@@ -100,11 +99,12 @@ public class UnionInputGate implements InputGate {
                        inputGatesWithRemainingData.add(inputGate);
 
                        currentNumberOfInputChannels += 
inputGate.getNumberOfInputChannels();
+
+                       // Register the union gate as a listener for all input 
gates
+                       inputGate.registerListener(this);
                }
 
                this.totalNumberOfInputChannels = currentNumberOfInputChannels;
-
-               this.inputGateListener = new InputGateListener(inputGates, 
this);
        }
 
        /**
@@ -139,7 +139,6 @@ public class UnionInputGate implements InputGate {
 
        @Override
        public BufferOrEvent getNextBufferOrEvent() throws IOException, 
InterruptedException {
-
                if (inputGatesWithRemainingData.isEmpty()) {
                        return null;
                }
@@ -147,17 +146,31 @@ public class UnionInputGate implements InputGate {
                // Make sure to request the partitions, if they have not been 
requested before.
                requestPartitions();
 
-               final InputGate inputGate = 
inputGateListener.getNextInputGateToReadFrom();
+               final InputGate inputGate;
+               synchronized (inputGatesWithData) {
+                       while (inputGatesWithData.size() == 0) {
+                               inputGatesWithData.wait();
+                       }
+
+                       inputGate = inputGatesWithData.remove();
+               }
 
                final BufferOrEvent bufferOrEvent = 
inputGate.getNextBufferOrEvent();
 
+               if (bufferOrEvent.moreAvailable()) {
+                       // this buffer or event was now removed from the 
non-empty gates queue
+                       // we re-add it in case it has more data, because in 
that case no "non-empty" notification
+                       // will come for that gate
+                       queueInputGate(inputGate);
+               }
+
                if (bufferOrEvent.isEvent()
-                               && bufferOrEvent.getEvent().getClass() == 
EndOfPartitionEvent.class
-                               && inputGate.isFinished()) {
+                       && bufferOrEvent.getEvent().getClass() == 
EndOfPartitionEvent.class
+                       && inputGate.isFinished()) {
 
                        if (!inputGatesWithRemainingData.remove(inputGate)) {
                                throw new IllegalStateException("Couldn't find 
input gate in set of remaining " +
-                                               "input gates.");
+                                       "input gates.");
                        }
                }
 
@@ -177,9 +190,12 @@ public class UnionInputGate implements InputGate {
        }
 
        @Override
-       public void registerListener(EventListener<InputGate> listener) {
-               // This method is called from the consuming task thread.
-               inputGateListener.registerListener(listener);
+       public void registerListener(InputGateListener listener) {
+               if (this.inputGateListener == null) {
+                       this.inputGateListener = listener;
+               } else {
+                       throw new IllegalStateException("Multiple listeners");
+               }
        }
 
        @Override
@@ -195,45 +211,29 @@ public class UnionInputGate implements InputGate {
                return pageSize;
        }
 
-       /**
-        * Data availability listener at all unioned input gates.
-        *
-        * <p> The listener registers itself at each input gate and is notified 
for *each incoming
-        * buffer* at one of the unioned input gates.
-        */
-       private static class InputGateListener implements 
EventListener<InputGate> {
-
-               private final UnionInputGate unionInputGate;
-
-               private final BlockingQueue<InputGate> inputGatesWithData = new 
LinkedBlockingQueue<InputGate>();
+       @Override
+       public void notifyInputGateNonEmpty(InputGate inputGate) {
+               queueInputGate(checkNotNull(inputGate));
+       }
 
-               private final List<EventListener<InputGate>> 
registeredListeners = new CopyOnWriteArrayList<EventListener<InputGate>>();
+       private void queueInputGate(InputGate inputGate) {
+               int availableInputGates;
 
-               public InputGateListener(InputGate[] inputGates, UnionInputGate 
unionInputGate) {
-                       for (InputGate inputGate : inputGates) {
-                               inputGate.registerListener(this);
-                       }
+               synchronized (inputGatesWithData) {
+                       availableInputGates = inputGatesWithData.size();
 
-                       this.unionInputGate = unionInputGate;
-               }
-
-               @Override
-               public void onEvent(InputGate inputGate) {
-                       // This method is called from the input channel thread, 
which can be either the same
-                       // thread as the consuming task thread or a different 
one.
                        inputGatesWithData.add(inputGate);
 
-                       for (int i = 0; i < registeredListeners.size(); i++) {
-                               
registeredListeners.get(i).onEvent(unionInputGate);
+                       if (availableInputGates == 0) {
+                               inputGatesWithData.notify();
                        }
                }
 
-               InputGate getNextInputGateToReadFrom() throws 
InterruptedException {
-                       return inputGatesWithData.take();
-               }
-
-               public void registerListener(EventListener<InputGate> listener) 
{
-                       registeredListeners.add(checkNotNull(listener));
+               if (availableInputGates == 0) {
+                       InputGateListener listener = inputGateListener;
+                       if (listener != null) {
+                               listener.notifyInputGateNonEmpty(this);
+                       }
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index cc91e83..0c02112 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -23,8 +23,6 @@ import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.api.reader.BufferReader;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import scala.Tuple2;
@@ -75,9 +73,9 @@ public class UnknownInputChannel extends InputChannel {
        }
 
        @Override
-       public Buffer getNextBuffer() throws IOException {
+       public BufferAndAvailability getNextBuffer() throws IOException {
                // Nothing to do here
-               return null;
+               throw new UnsupportedOperationException("Cannot retrieve a 
buffer from an UnknownInputChannel");
        }
 
        @Override
@@ -90,8 +88,7 @@ public class UnknownInputChannel extends InputChannel {
         * <p>
         * <strong>Important</strong>: It is important that the method correctly
         * always <code>false</code> for unknown input channels in order to not
-        * finish the consumption of an intermediate result partition early in
-        * {@link BufferReader}.
+        * finish the consumption of an intermediate result partition early.
         */
        @Override
        public boolean isReleased() {

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfce175/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 514a8d2..9565666 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -337,7 +337,6 @@ public class Task implements Runnable {
                                        
networkEnvironment.getPartitionManager(),
                                        
networkEnvironment.getPartitionConsumableNotifier(),
                                        ioManager,
-                                       networkEnvironment.getDefaultIOMode(),
                                        
desc.sendScheduleOrUpdateConsumersMessage());
 
                        writers[counter] = new 
ResultPartitionWriter(producedPartitions[counter]);

Reply via email to