This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dd1eb082fdd291523ec931a01b295e407a2df1f5
Author: Piotr Nowojski <[email protected]>
AuthorDate: Fri May 10 15:22:23 2019 +0200

    Revert "Merge pull request #8361 from pnowojski/f12434"
    
    This reverts commit d3fd7a6794a8ffd16e00bb1867b6e62c3909a2d2.
---
 .../io/network/partition/consumer/InputGate.java   |  55 +-----
 .../partition/consumer/InputGateListener.java      |  35 ++++
 .../partition/consumer/SingleInputGate.java        |  87 ++++-----
 .../network/partition/consumer/UnionInputGate.java | 206 ++++++++++-----------
 .../partition/consumer/InputGateTestBase.java      |  91 ---------
 .../partition/consumer/SingleInputGateTest.java    |  54 ++++--
 .../partition/consumer/TestSingleInputGate.java    |  44 ++++-
 .../partition/consumer/UnionInputGateTest.java     |  22 +--
 .../consumer/StreamTestSingleInputGate.java        |  84 +--------
 .../runtime/io/BarrierBufferMassiveRandomTest.java |   7 +-
 .../flink/streaming/runtime/io/MockInputGate.java  |   9 +-
 11 files changed, 281 insertions(+), 413 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 03ac822..c270d37 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -22,9 +22,6 @@ import org.apache.flink.runtime.event.TaskEvent;
 
 import java.io.IOException;
 import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * An input gate consumes one or more partitions of a single produced 
intermediate result.
@@ -68,65 +65,33 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * will have an input gate attached to it. This will provide its input, which 
will consist of one
  * subpartition from each partition of the intermediate result.
  */
-public abstract class InputGate implements AutoCloseable {
-
-       public static final CompletableFuture<?> AVAILABLE = 
CompletableFuture.completedFuture(null);
-
-       protected CompletableFuture<?> isAvailable = new CompletableFuture<>();
+public interface InputGate extends AutoCloseable {
 
-       public abstract int getNumberOfInputChannels();
+       int getNumberOfInputChannels();
 
-       public abstract String getOwningTaskName();
+       String getOwningTaskName();
 
-       public abstract boolean isFinished();
+       boolean isFinished();
 
-       public abstract void requestPartitions() throws IOException, 
InterruptedException;
+       void requestPartitions() throws IOException, InterruptedException;
 
        /**
         * Blocking call waiting for next {@link BufferOrEvent}.
         *
         * @return {@code Optional.empty()} if {@link #isFinished()} returns 
true.
         */
-       public abstract Optional<BufferOrEvent> getNextBufferOrEvent() throws 
IOException, InterruptedException;
+       Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, 
InterruptedException;
 
        /**
         * Poll the {@link BufferOrEvent}.
         *
         * @return {@code Optional.empty()} if there is no data to return or if 
{@link #isFinished()} returns true.
         */
-       public abstract Optional<BufferOrEvent> pollNextBufferOrEvent() throws 
IOException, InterruptedException;
-
-       public abstract void sendTaskEvent(TaskEvent event) throws IOException;
-
-       public abstract int getPageSize();
-
-       /**
-        * @return a future that is completed if there are more records 
available. If there more records
-        * available immediately, {@link #AVAILABLE} should be returned.
-        */
-       public CompletableFuture<?> isAvailable() {
-               return isAvailable;
-       }
+       Optional<BufferOrEvent> pollNextBufferOrEvent() throws IOException, 
InterruptedException;
 
-       protected void resetIsAvailable() {
-               // try to avoid volatile access in isDone()}
-               if (isAvailable == AVAILABLE || isAvailable.isDone()) {
-                       isAvailable = new CompletableFuture<>();
-               }
-       }
+       void sendTaskEvent(TaskEvent event) throws IOException;
 
-       /**
-        * Simple pojo for INPUT, DATA and moreAvailable.
-        */
-       protected static class InputWithData<INPUT, DATA> {
-               protected final INPUT input;
-               protected final DATA data;
-               protected final boolean moreAvailable;
+       void registerListener(InputGateListener listener);
 
-               InputWithData(INPUT input, DATA data, boolean moreAvailable) {
-                       this.input = checkNotNull(input);
-                       this.data = checkNotNull(data);
-                       this.moreAvailable = moreAvailable;
-               }
-       }
+       int getPageSize();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateListener.java
new file mode 100644
index 0000000..00fa782
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateListener.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+/**
+ * Listener interface implemented by consumers of {@link InputGate} instances
+ * that want to be notified of availability of buffer or event instances.
+ */
+public interface InputGateListener {
+
+       /**
+        * Notification callback if the input gate moves from zero to non-zero
+        * available input channels with data.
+        *
+        * @param inputGate Input Gate that became available.
+        */
+       void notifyInputGateNonEmpty(InputGate inputGate);
+
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 19912b2..d40af83 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -56,7 +56,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Timer;
-import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -102,7 +101,7 @@ import static 
org.apache.flink.util.Preconditions.checkState;
  * in two partitions (Partition 1 and 2). Each of these partitions is further 
partitioned into two
  * subpartitions -- one for each parallel reduce subtask.
  */
-public class SingleInputGate extends InputGate {
+public class SingleInputGate implements InputGate {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(SingleInputGate.class);
 
@@ -173,6 +172,9 @@ public class SingleInputGate extends InputGate {
        /** Flag indicating whether all resources have been released. */
        private volatile boolean isReleased;
 
+       /** Registered listener to forward buffer notifications to. */
+       private volatile InputGateListener inputGateListener;
+
        private final List<TaskEvent> pendingEvents = new ArrayList<>();
 
        private int numberOfUninitializedChannels;
@@ -529,21 +531,12 @@ public class SingleInputGate extends InputGate {
                }
 
                requestPartitions();
-               Optional<InputWithData<InputChannel, BufferAndAvailability>> 
next = waitAndGetNextData(blocking);
-               if (!next.isPresent()) {
-                       return Optional.empty();
-               }
 
-               InputWithData<InputChannel, BufferAndAvailability> 
inputWithData = next.get();
-               return Optional.of(transformToBufferOrEvent(
-                       inputWithData.data.buffer(),
-                       inputWithData.moreAvailable,
-                       inputWithData.input));
-       }
+               InputChannel currentChannel;
+               boolean moreAvailable;
+               Optional<BufferAndAvailability> result = Optional.empty();
 
-       private Optional<InputWithData<InputChannel, BufferAndAvailability>> 
waitAndGetNextData(boolean blocking)
-                       throws IOException, InterruptedException {
-               while (true) {
+               do {
                        synchronized (inputChannelsWithData) {
                                while (inputChannelsWithData.size() == 0) {
                                        if (isReleased) {
@@ -554,43 +547,30 @@ public class SingleInputGate extends InputGate {
                                                inputChannelsWithData.wait();
                                        }
                                        else {
-                                               resetIsAvailable();
                                                return Optional.empty();
                                        }
                                }
 
-                               InputChannel inputChannel = 
inputChannelsWithData.remove();
-
-                               Optional<BufferAndAvailability> result = 
inputChannel.getNextBuffer();
+                               currentChannel = inputChannelsWithData.remove();
+                               
enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
+                               moreAvailable = 
!inputChannelsWithData.isEmpty();
+                       }
 
-                               if (result.isPresent() && 
result.get().moreAvailable()) {
-                                       // enqueue the inputChannel at the end 
to avoid starvation
-                                       inputChannelsWithData.add(inputChannel);
-                               } else {
-                                       
enqueuedInputChannelsWithData.clear(inputChannel.getChannelIndex());
-                               }
+                       result = currentChannel.getNextBuffer();
+               } while (!result.isPresent());
 
-                               if (inputChannelsWithData.isEmpty()) {
-                                       resetIsAvailable();
-                               }
-
-                               if (result.isPresent()) {
-                                       return Optional.of(new InputWithData<>(
-                                               inputChannel,
-                                               result.get(),
-                                               
!inputChannelsWithData.isEmpty()));
-                               }
-                       }
+               // this channel was now removed from the non-empty channels 
queue
+               // we re-add it in case it has more data, because in that case 
no "non-empty" notification
+               // will come for that channel
+               if (result.get().moreAvailable()) {
+                       queueChannel(currentChannel);
+                       moreAvailable = true;
                }
-       }
 
-       private BufferOrEvent transformToBufferOrEvent(
-                       Buffer buffer,
-                       boolean moreAvailable,
-                       InputChannel currentChannel) throws IOException, 
InterruptedException {
+               final Buffer buffer = result.get().buffer();
                numBytesIn.inc(buffer.getSizeUnsafe());
                if (buffer.isBuffer()) {
-                       return new BufferOrEvent(buffer, 
currentChannel.getChannelIndex(), moreAvailable);
+                       return Optional.of(new BufferOrEvent(buffer, 
currentChannel.getChannelIndex(), moreAvailable));
                }
                else {
                        final AbstractEvent event = 
EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
@@ -609,10 +589,11 @@ public class SingleInputGate extends InputGate {
                                }
 
                                currentChannel.notifySubpartitionConsumed();
+
                                currentChannel.releaseAllResources();
                        }
 
-                       return new BufferOrEvent(event, 
currentChannel.getChannelIndex(), moreAvailable);
+                       return Optional.of(new BufferOrEvent(event, 
currentChannel.getChannelIndex(), moreAvailable));
                }
        }
 
@@ -633,6 +614,15 @@ public class SingleInputGate extends InputGate {
        // Channel notifications
        // 
------------------------------------------------------------------------
 
+       @Override
+       public void registerListener(InputGateListener inputGateListener) {
+               if (this.inputGateListener == null) {
+                       this.inputGateListener = inputGateListener;
+               } else {
+                       throw new IllegalStateException("Multiple listeners");
+               }
+       }
+
        void notifyChannelNonEmpty(InputChannel channel) {
                queueChannel(checkNotNull(channel));
        }
@@ -644,8 +634,6 @@ public class SingleInputGate extends InputGate {
        private void queueChannel(InputChannel channel) {
                int availableChannels;
 
-               CompletableFuture<?> toNotify = null;
-
                synchronized (inputChannelsWithData) {
                        if 
(enqueuedInputChannelsWithData.get(channel.getChannelIndex())) {
                                return;
@@ -657,13 +645,14 @@ public class SingleInputGate extends InputGate {
 
                        if (availableChannels == 0) {
                                inputChannelsWithData.notifyAll();
-                               toNotify = isAvailable;
-                               isAvailable = AVAILABLE;
                        }
                }
 
-               if (toNotify != null) {
-                       toNotify.complete(null);
+               if (availableChannels == 0) {
+                       InputGateListener listener = inputGateListener;
+                       if (listener != null) {
+                               listener.notifyInputGateNonEmpty(this);
+                       }
                }
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 5019cfc..ea83004 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -25,12 +25,11 @@ import 
org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
 import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
 
 import java.io.IOException;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
+import java.util.ArrayDeque;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -64,22 +63,27 @@ import static 
org.apache.flink.util.Preconditions.checkState;
  *
  * <strong>It is NOT possible to recursively union union input gates.</strong>
  */
-public class UnionInputGate extends InputGate {
+public class UnionInputGate implements InputGate, InputGateListener {
 
        /** The input gates to union. */
        private final InputGate[] inputGates;
 
        private final Set<InputGate> inputGatesWithRemainingData;
 
+       /** Gates, which notified this input gate about available data. */
+       private final ArrayDeque<InputGate> inputGatesWithData = new 
ArrayDeque<>();
+
        /**
-        * Gates, which notified this input gate about available data. We are 
using it as a FIFO
-        * queue of {@link InputGate}s to avoid starvation and provide some 
basic fairness.
+        * Guardian against enqueuing an {@link InputGate} multiple times on 
{@code inputGatesWithData}.
         */
-       private final LinkedHashSet<InputGate> inputGatesWithData = new 
LinkedHashSet<>();
+       private final Set<InputGate> enqueuedInputGatesWithData = new 
HashSet<>();
 
        /** The total number of input channels across all unioned input gates. 
*/
        private final int totalNumberOfInputChannels;
 
+       /** Registered listener to forward input gate notifications to. */
+       private volatile InputGateListener inputGateListener;
+
        /**
         * A mapping from input gate to (logical) channel index offset. Valid 
channel indexes go from 0
         * (inclusive) to the total number of input channels (exclusive).
@@ -98,31 +102,20 @@ public class UnionInputGate extends InputGate {
 
                int currentNumberOfInputChannels = 0;
 
-               synchronized (inputGatesWithData) {
-                       for (InputGate inputGate : inputGates) {
-                               if (inputGate instanceof UnionInputGate) {
-                                       // if we want to add support for this, 
we need to implement pollNextBufferOrEvent()
-                                       throw new 
UnsupportedOperationException("Cannot union a union of input gates.");
-                               }
-
-                               // The offset to use for buffer or event 
instances received from this input gate.
-                               
inputGateToIndexOffsetMap.put(checkNotNull(inputGate), 
currentNumberOfInputChannels);
-                               inputGatesWithRemainingData.add(inputGate);
-
-                               currentNumberOfInputChannels += 
inputGate.getNumberOfInputChannels();
+               for (InputGate inputGate : inputGates) {
+                       if (inputGate instanceof UnionInputGate) {
+                               // if we want to add support for this, we need 
to implement pollNextBufferOrEvent()
+                               throw new UnsupportedOperationException("Cannot 
union a union of input gates.");
+                       }
 
-                               CompletableFuture<?> available = 
inputGate.isAvailable();
+                       // The offset to use for buffer or event instances 
received from this input gate.
+                       inputGateToIndexOffsetMap.put(checkNotNull(inputGate), 
currentNumberOfInputChannels);
+                       inputGatesWithRemainingData.add(inputGate);
 
-                               if (available.isDone()) {
-                                       inputGatesWithData.add(inputGate);
-                               } else {
-                                       available.thenRun(() -> 
queueInputGate(inputGate));
-                               }
-                       }
+                       currentNumberOfInputChannels += 
inputGate.getNumberOfInputChannels();
 
-                       if (!inputGatesWithData.isEmpty()) {
-                               isAvailable = AVAILABLE;
-                       }
+                       // Register the union gate as a listener for all input 
gates
+                       inputGate.registerListener(this);
                }
 
                this.totalNumberOfInputChannels = currentNumberOfInputChannels;
@@ -166,15 +159,6 @@ public class UnionInputGate extends InputGate {
 
        @Override
        public Optional<BufferOrEvent> getNextBufferOrEvent() throws 
IOException, InterruptedException {
-               return getNextBufferOrEvent(true);
-       }
-
-       @Override
-       public Optional<BufferOrEvent> pollNextBufferOrEvent() throws 
IOException, InterruptedException {
-               return getNextBufferOrEvent(false);
-       }
-
-       private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) 
throws IOException, InterruptedException {
                if (inputGatesWithRemainingData.isEmpty()) {
                        return Optional.empty();
                }
@@ -182,87 +166,75 @@ public class UnionInputGate extends InputGate {
                // Make sure to request the partitions, if they have not been 
requested before.
                requestPartitions();
 
-               Optional<InputWithData<InputGate, BufferOrEvent>> next = 
waitAndGetNextData(blocking);
-               if (!next.isPresent()) {
-                       return Optional.empty();
-               }
-
-               InputWithData<InputGate, BufferOrEvent> inputWithData = 
next.get();
-
-               handleEndOfPartitionEvent(inputWithData.data, 
inputWithData.input);
-               return Optional.of(adjustForUnionInputGate(
-                       inputWithData.data,
-                       inputWithData.input,
-                       inputWithData.moreAvailable));
-       }
-
-       private Optional<InputWithData<InputGate, BufferOrEvent>> 
waitAndGetNextData(boolean blocking)
-                       throws IOException, InterruptedException {
-               while (true) {
-                       synchronized (inputGatesWithData) {
-                               while (inputGatesWithData.size() == 0) {
-                                       if (blocking) {
-                                               inputGatesWithData.wait();
-                                       } else {
-                                               resetIsAvailable();
-                                               return Optional.empty();
-                                       }
-                               }
+               InputGateWithData inputGateWithData = waitAndGetNextInputGate();
+               InputGate inputGate = inputGateWithData.inputGate;
+               BufferOrEvent bufferOrEvent = inputGateWithData.bufferOrEvent;
 
-                               Iterator<InputGate> inputGateIterator = 
inputGatesWithData.iterator();
-                               final InputGate inputGate = 
inputGateIterator.next();
-                               inputGateIterator.remove();
-
-                               // In case of inputGatesWithData being 
inaccurate do not block on an empty inputGate, but just poll the data.
-                               Optional<BufferOrEvent> bufferOrEvent = 
inputGate.pollNextBufferOrEvent();
-
-                               if (bufferOrEvent.isPresent() && 
bufferOrEvent.get().moreAvailable()) {
-                                       // enqueue the inputGate at the end to 
avoid starvation
-                                       inputGatesWithData.add(inputGate);
-                               } else {
-                                       inputGate.isAvailable().thenRun(() -> 
queueInputGate(inputGate));
-                               }
+               if (bufferOrEvent.moreAvailable()) {
+                       // this buffer or event was now removed from the 
non-empty gates queue
+                       // we re-add it in case it has more data, because in 
that case no "non-empty" notification
+                       // will come for that gate
+                       queueInputGate(inputGate);
+               }
 
-                               if (inputGatesWithData.isEmpty()) {
-                                       resetIsAvailable();
-                               }
+               if (bufferOrEvent.isEvent()
+                       && bufferOrEvent.getEvent().getClass() == 
EndOfPartitionEvent.class
+                       && inputGate.isFinished()) {
 
-                               if (bufferOrEvent.isPresent()) {
-                                       return Optional.of(new InputWithData<>(
-                                               inputGate,
-                                               bufferOrEvent.get(),
-                                               !inputGatesWithData.isEmpty()));
-                               }
+                       checkState(!bufferOrEvent.moreAvailable());
+                       if (!inputGatesWithRemainingData.remove(inputGate)) {
+                               throw new IllegalStateException("Couldn't find 
input gate in set of remaining " +
+                                       "input gates.");
                        }
                }
-       }
 
-       private BufferOrEvent adjustForUnionInputGate(
-               BufferOrEvent bufferOrEvent,
-               InputGate inputGate,
-               boolean moreInputGatesAvailable) {
                // Set the channel index to identify the input channel (across 
all unioned input gates)
                final int channelIndexOffset = 
inputGateToIndexOffsetMap.get(inputGate);
 
                bufferOrEvent.setChannelIndex(channelIndexOffset + 
bufferOrEvent.getChannelIndex());
-               bufferOrEvent.setMoreAvailable(bufferOrEvent.moreAvailable() || 
moreInputGatesAvailable);
+               bufferOrEvent.setMoreAvailable(bufferOrEvent.moreAvailable() || 
inputGateWithData.moreInputGatesAvailable);
 
-               return bufferOrEvent;
+               return Optional.of(bufferOrEvent);
        }
 
-       private void handleEndOfPartitionEvent(BufferOrEvent bufferOrEvent, 
InputGate inputGate) {
-               if (bufferOrEvent.isEvent()
-                       && bufferOrEvent.getEvent().getClass() == 
EndOfPartitionEvent.class
-                       && inputGate.isFinished()) {
+       @Override
+       public Optional<BufferOrEvent> pollNextBufferOrEvent() throws 
UnsupportedOperationException {
+               throw new UnsupportedOperationException();
+       }
 
-                       checkState(!bufferOrEvent.moreAvailable());
-                       if (!inputGatesWithRemainingData.remove(inputGate)) {
-                               throw new IllegalStateException("Couldn't find 
input gate in set of remaining " +
-                                       "input gates.");
+       private InputGateWithData waitAndGetNextInputGate() throws IOException, 
InterruptedException {
+               while (true) {
+                       InputGate inputGate;
+                       boolean moreInputGatesAvailable;
+                       synchronized (inputGatesWithData) {
+                               while (inputGatesWithData.size() == 0) {
+                                       inputGatesWithData.wait();
+                               }
+                               inputGate = inputGatesWithData.remove();
+                               enqueuedInputGatesWithData.remove(inputGate);
+                               moreInputGatesAvailable = 
enqueuedInputGatesWithData.size() > 0;
+                       }
+
+                       // In case of inputGatesWithData being inaccurate do 
not block on an empty inputGate, but just poll the data.
+                       Optional<BufferOrEvent> bufferOrEvent = 
inputGate.pollNextBufferOrEvent();
+                       if (bufferOrEvent.isPresent()) {
+                               return new InputGateWithData(inputGate, 
bufferOrEvent.get(), moreInputGatesAvailable);
                        }
                }
        }
 
+       private static class InputGateWithData {
+               private final InputGate inputGate;
+               private final BufferOrEvent bufferOrEvent;
+               private final boolean moreInputGatesAvailable;
+
+               InputGateWithData(InputGate inputGate, BufferOrEvent 
bufferOrEvent, boolean moreInputGatesAvailable) {
+                       this.inputGate = checkNotNull(inputGate);
+                       this.bufferOrEvent = checkNotNull(bufferOrEvent);
+                       this.moreInputGatesAvailable = moreInputGatesAvailable;
+               }
+       }
+
        @Override
        public void sendTaskEvent(TaskEvent event) throws IOException {
                for (InputGate inputGate : inputGates) {
@@ -271,6 +243,15 @@ public class UnionInputGate extends InputGate {
        }
 
        @Override
+       public void registerListener(InputGateListener listener) {
+               if (this.inputGateListener == null) {
+                       this.inputGateListener = listener;
+               } else {
+                       throw new IllegalStateException("Multiple listeners");
+               }
+       }
+
+       @Override
        public int getPageSize() {
                int pageSize = -1;
                for (InputGate gate : inputGates) {
@@ -287,29 +268,34 @@ public class UnionInputGate extends InputGate {
        public void close() throws IOException {
        }
 
-       private void queueInputGate(InputGate inputGate) {
-               checkNotNull(inputGate);
+       @Override
+       public void notifyInputGateNonEmpty(InputGate inputGate) {
+               queueInputGate(checkNotNull(inputGate));
+       }
 
-               CompletableFuture<?> toNotify = null;
+       private void queueInputGate(InputGate inputGate) {
+               int availableInputGates;
 
                synchronized (inputGatesWithData) {
-                       if (inputGatesWithData.contains(inputGate)) {
+                       if (enqueuedInputGatesWithData.contains(inputGate)) {
                                return;
                        }
 
-                       int availableInputGates = inputGatesWithData.size();
+                       availableInputGates = inputGatesWithData.size();
 
                        inputGatesWithData.add(inputGate);
+                       enqueuedInputGatesWithData.add(inputGate);
 
                        if (availableInputGates == 0) {
                                inputGatesWithData.notifyAll();
-                               toNotify = isAvailable;
-                               isAvailable = AVAILABLE;
                        }
                }
 
-               if (toNotify != null) {
-                       toNotify.complete(null);
+               if (availableInputGates == 0) {
+                       InputGateListener listener = inputGateListener;
+                       if (listener != null) {
+                               listener.notifyInputGateNonEmpty(this);
+                       }
                }
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java
deleted file mode 100644
index ddf027c..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition.consumer;
-
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test base for {@link InputGate}.
- */
-@RunWith(Parameterized.class)
-public abstract class InputGateTestBase {
-
-       @Parameter
-       public boolean enableCreditBasedFlowControl;
-
-       @Parameters(name = "Credit-based = {0}")
-       public static List<Boolean> parameters() {
-               return Arrays.asList(Boolean.TRUE, Boolean.FALSE);
-       }
-
-       protected void testIsAvailable(
-                       InputGate inputGateToTest,
-                       SingleInputGate inputGateToNotify,
-                       TestInputChannel inputChannelWithNewData) throws 
Exception {
-
-               assertFalse(inputGateToTest.isAvailable().isDone());
-               
assertFalse(inputGateToTest.pollNextBufferOrEvent().isPresent());
-
-               CompletableFuture<?> isAvailable = 
inputGateToTest.isAvailable();
-
-               assertFalse(inputGateToTest.isAvailable().isDone());
-               
assertFalse(inputGateToTest.pollNextBufferOrEvent().isPresent());
-
-               assertEquals(isAvailable, inputGateToTest.isAvailable());
-
-               inputChannelWithNewData.readBuffer();
-               
inputGateToNotify.notifyChannelNonEmpty(inputChannelWithNewData);
-
-               assertTrue(isAvailable.isDone());
-               assertTrue(inputGateToTest.isAvailable().isDone());
-       }
-
-       protected SingleInputGate createInputGate() {
-               return createInputGate(2);
-       }
-
-       protected SingleInputGate createInputGate(int numberOfInputChannels) {
-               return createInputGate(numberOfInputChannels, 
ResultPartitionType.PIPELINED);
-       }
-
-       protected SingleInputGate createInputGate(
-                       int numberOfInputChannels, ResultPartitionType 
partitionType) {
-               SingleInputGate inputGate = createSingleInputGate(
-                       numberOfInputChannels,
-                       partitionType,
-                       enableCreditBasedFlowControl);
-
-               assertEquals(partitionType, 
inputGate.getConsumedPartitionType());
-               return inputGate;
-       }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index a6f824d..71e4f5a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -50,13 +50,18 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
 
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
@@ -75,7 +80,17 @@ import static org.mockito.Mockito.when;
 /**
  * Tests for {@link SingleInputGate}.
  */
-public class SingleInputGateTest extends InputGateTestBase {
+@RunWith(Parameterized.class)
+public class SingleInputGateTest {
+
+       @Parameterized.Parameter
+       public boolean enableCreditBasedFlowControl;
+
+       @Parameterized.Parameters(name = "Credit-based = {0}")
+       public static List<Boolean> parameters() {
+               return Arrays.asList(Boolean.TRUE, Boolean.FALSE);
+       }
+
        /**
         * Tests basic correctness of buffer-or-event interleaving and correct 
<code>null</code> return
         * value after receiving all end-of-partition events.
@@ -116,15 +131,6 @@ public class SingleInputGateTest extends InputGateTestBase 
{
                assertTrue(inputGate.isFinished());
        }
 
-       @Test
-       public void testIsAvailable() throws Exception {
-               final SingleInputGate inputGate = createInputGate(1);
-               TestInputChannel inputChannel = new TestInputChannel(inputGate, 
0);
-               inputGate.setInputChannel(new IntermediateResultPartitionID(), 
inputChannel);
-
-               testIsAvailable(inputGate, inputGate, inputChannel);
-       }
-
        @Test(timeout = 120 * 1000)
        public void testIsMoreAvailableReadingFromSingleInputChannel() throws 
Exception {
                // Setup
@@ -540,6 +546,22 @@ public class SingleInputGateTest extends InputGateTestBase 
{
 
        // 
---------------------------------------------------------------------------------------------
 
+       private SingleInputGate createInputGate() {
+               return createInputGate(2);
+       }
+
+       private SingleInputGate createInputGate(int numberOfInputChannels) {
+               return createInputGate(numberOfInputChannels, 
ResultPartitionType.PIPELINED);
+       }
+
+       private SingleInputGate createInputGate(int numberOfInputChannels, 
ResultPartitionType partitionType) {
+               SingleInputGate inputGate = 
createSingleInputGate(numberOfInputChannels, partitionType, 
enableCreditBasedFlowControl);
+
+               assertEquals(partitionType, 
inputGate.getConsumedPartitionType());
+
+               return inputGate;
+       }
+
        private void addUnknownInputChannel(
                        NetworkEnvironment network,
                        SingleInputGate inputGate,
@@ -597,7 +619,17 @@ public class SingleInputGateTest extends InputGateTestBase 
{
                assertEquals(expectedChannelIndex, 
bufferOrEvent.get().getChannelIndex());
                assertEquals(expectedMoreAvailable, 
bufferOrEvent.get().moreAvailable());
                if (!expectedMoreAvailable) {
-                       
assertFalse(inputGate.pollNextBufferOrEvent().isPresent());
+                       try {
+                               
assertFalse(inputGate.pollNextBufferOrEvent().isPresent());
+                       }
+                       catch (UnsupportedOperationException ex) {
+                               /**
+                                * {@link 
UnionInputGate#pollNextBufferOrEvent()} is unsupported at the moment.
+                                */
+                               if (!(inputGate instanceof UnionInputGate)) {
+                                       throw ex;
+                               }
+                       }
                }
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index f60cdb9..464ab7c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -20,8 +20,17 @@ package 
org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.lang.reflect.Field;
+import java.util.ArrayDeque;
+
 import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
 
 /**
  * A test input gate to mock reading data.
@@ -35,8 +44,39 @@ public class TestSingleInputGate {
        public TestSingleInputGate(int numberOfInputChannels, boolean 
initialize) {
                checkArgument(numberOfInputChannels >= 1);
 
-               inputGate = createSingleInputGate(numberOfInputChannels);
-               inputChannels = new TestInputChannel[numberOfInputChannels];
+               SingleInputGate realGate = 
createSingleInputGate(numberOfInputChannels);
+
+               this.inputGate = spy(realGate);
+
+               // Notify about late registrations (added for 
DataSinkTaskTest#testUnionDataSinkTask).
+               // After merging registerInputOutput and invoke, we have to 
make sure that the test
+               // notifications happen at the expected time. In real programs, 
this is guaranteed by
+               // the instantiation and request partition life cycle.
+               try {
+                       Field f = 
realGate.getClass().getDeclaredField("inputChannelsWithData");
+                       f.setAccessible(true);
+                       final ArrayDeque<InputChannel> notifications = 
(ArrayDeque<InputChannel>) f.get(realGate);
+
+                       doAnswer(new Answer<Void>() {
+                               @Override
+                               public Void answer(InvocationOnMock invocation) 
throws Throwable {
+                                       invocation.callRealMethod();
+
+                                       synchronized (notifications) {
+                                               if (!notifications.isEmpty()) {
+                                                       InputGateListener 
listener = (InputGateListener) invocation.getArguments()[0];
+                                                       
listener.notifyInputGateNonEmpty(inputGate);
+                                               }
+                                       }
+
+                                       return null;
+                               }
+                       
}).when(inputGate).registerListener(any(InputGateListener.class));
+               } catch (Exception e) {
+                       throw new RuntimeException(e);
+               }
+
+               this.inputChannels = new 
TestInputChannel[numberOfInputChannels];
 
                if (initialize) {
                        for (int i = 0; i < numberOfInputChannels; i++) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 9dca613..082ccec 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -18,10 +18,9 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-
 import org.junit.Test;
 
+import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateTest.verifyBufferOrEvent;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -30,7 +29,7 @@ import static org.junit.Assert.assertTrue;
 /**
  * Tests for {@link UnionInputGate}.
  */
-public class UnionInputGateTest extends InputGateTestBase {
+public class UnionInputGateTest {
 
        /**
         * Tests basic correctness of buffer-or-event interleaving and correct 
<code>null</code> return
@@ -42,8 +41,8 @@ public class UnionInputGateTest extends InputGateTestBase {
        @Test(timeout = 120 * 1000)
        public void testBasicGetNextLogic() throws Exception {
                // Setup
-               final SingleInputGate ig1 = createInputGate(3);
-               final SingleInputGate ig2 = createInputGate(5);
+               final SingleInputGate ig1 = createSingleInputGate(3);
+               final SingleInputGate ig2 = createSingleInputGate(5);
 
                final UnionInputGate union = new UnionInputGate(new 
SingleInputGate[]{ig1, ig2});
 
@@ -102,17 +101,4 @@ public class UnionInputGateTest extends InputGateTestBase {
                assertTrue(union.isFinished());
                assertFalse(union.getNextBufferOrEvent().isPresent());
        }
-
-       @Test
-       public void testIsAvailable() throws Exception {
-               final SingleInputGate inputGate1 = createInputGate(1);
-               TestInputChannel inputChannel1 = new 
TestInputChannel(inputGate1, 0);
-               inputGate1.setInputChannel(new IntermediateResultPartitionID(), 
inputChannel1);
-
-               final SingleInputGate inputGate2 = createInputGate(1);
-               TestInputChannel inputChannel2 = new 
TestInputChannel(inputGate2, 0);
-               inputGate2.setInputChannel(new IntermediateResultPartitionID(), 
inputChannel2);
-
-               testIsAvailable(new UnionInputGate(inputGate1, inputGate2), 
inputGate1, inputChannel1);
-       }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index b9fe84f..dbb81ab 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -21,16 +21,12 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferListener;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import 
org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel.BufferAndAvailabilityProvider;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -44,6 +40,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
+import static org.mockito.Mockito.doReturn;
 
 /**
  * Test {@link InputGate} that allows setting multiple channels. Use
@@ -79,7 +76,7 @@ public class StreamTestSingleInputGate<T> extends 
TestSingleInputGate {
                inputQueues = new ConcurrentLinkedQueue[numInputChannels];
 
                setupInputChannels();
-               inputGate.setBufferPool(new NoOpBufferPool(bufferSize));
+               doReturn(bufferSize).when(inputGate).getPageSize();
        }
 
        @SuppressWarnings("unchecked")
@@ -222,81 +219,4 @@ public class StreamTestSingleInputGate<T> extends 
TestSingleInputGate {
                        return isEvent;
                }
        }
-
-       private static class NoOpBufferPool implements BufferPool {
-               private int bufferSize;
-
-               public NoOpBufferPool(int bufferSize) {
-                       this.bufferSize = bufferSize;
-               }
-
-               @Override
-               public void lazyDestroy() {
-               }
-
-               @Override
-               public int getMemorySegmentSize() {
-                       return bufferSize;
-               }
-
-               @Override
-               public Buffer requestBuffer() throws IOException {
-                       throw new UnsupportedOperationException();
-               }
-
-               @Override
-               public Buffer requestBufferBlocking() throws IOException, 
InterruptedException {
-                       throw new UnsupportedOperationException();
-               }
-
-               @Override
-               public BufferBuilder requestBufferBuilderBlocking() throws 
IOException, InterruptedException {
-                       throw new UnsupportedOperationException();
-               }
-
-               @Override
-               public boolean addBufferListener(BufferListener listener) {
-                       throw new UnsupportedOperationException();
-               }
-
-               @Override
-               public boolean isDestroyed() {
-                       throw new UnsupportedOperationException();
-               }
-
-               @Override
-               public int getNumberOfRequiredMemorySegments() {
-                       throw new UnsupportedOperationException();
-               }
-
-               @Override
-               public int getMaxNumberOfMemorySegments() {
-                       throw new UnsupportedOperationException();
-               }
-
-               @Override
-               public int getNumBuffers() {
-                       throw new UnsupportedOperationException();
-               }
-
-               @Override
-               public void setNumBuffers(int numBuffers) throws IOException {
-                       throw new UnsupportedOperationException();
-               }
-
-               @Override
-               public int getNumberOfAvailableMemorySegments() {
-                       throw new UnsupportedOperationException();
-               }
-
-               @Override
-               public int bestEffortGetNumOfUsedBuffers() {
-                       throw new UnsupportedOperationException();
-               }
-
-               @Override
-               public void recycle(MemorySegment memorySegment) {
-                       throw new UnsupportedOperationException();
-               }
-       }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index b1a3ad5..ded52ff 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import 
org.apache.flink.runtime.io.network.partition.consumer.InputGateListener;
 
 import org.junit.Test;
 
@@ -129,7 +130,7 @@ public class BarrierBufferMassiveRandomTest {
                }
        }
 
-       private static class RandomGeneratingInputGate extends InputGate {
+       private static class RandomGeneratingInputGate implements InputGate {
 
                private final int numberOfChannels;
                private final BufferPool[] bufferPools;
@@ -150,7 +151,6 @@ public class BarrierBufferMassiveRandomTest {
                        this.bufferPools = bufferPools;
                        this.barrierGens = barrierGens;
                        this.owningTaskName = owningTaskName;
-                       this.isAvailable = AVAILABLE;
                }
 
                @Override
@@ -199,6 +199,9 @@ public class BarrierBufferMassiveRandomTest {
                public void sendTaskEvent(TaskEvent event) {}
 
                @Override
+               public void registerListener(InputGateListener listener) {}
+
+               @Override
                public int getPageSize() {
                        return PAGE_SIZE;
                }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index 30941ab..a29cbf5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import 
org.apache.flink.runtime.io.network.partition.consumer.InputGateListener;
 
 import java.util.ArrayDeque;
 import java.util.List;
@@ -31,7 +32,7 @@ import java.util.Queue;
 /**
  * Mock {@link InputGate}.
  */
-public class MockInputGate extends InputGate {
+public class MockInputGate implements InputGate {
 
        private final int pageSize;
 
@@ -55,8 +56,6 @@ public class MockInputGate extends InputGate {
                this.bufferOrEvents = new 
ArrayDeque<BufferOrEvent>(bufferOrEvents);
                this.closed = new boolean[numberOfChannels];
                this.owningTaskName = owningTaskName;
-
-               isAvailable = AVAILABLE;
        }
 
        @Override
@@ -112,6 +111,10 @@ public class MockInputGate extends InputGate {
        }
 
        @Override
+       public void registerListener(InputGateListener listener) {
+       }
+
+       @Override
        public void close() {
        }
 }

Reply via email to