This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4527d05c810fed6329bf0789e27cc43bd816e754
Author: Piotr Nowojski <[email protected]>
AuthorDate: Tue May 7 14:05:47 2019 +0200

    [FLINK-12434][network] Replace listeners with CompletableFuture in 
InputGates
    
    This commit replaces listeners in InputGates with `CompletableFuture<?> 
isAvailable`.
    Such change makes the contract simpler, since it avoids problems with:
    - only one possible listener
    - race conditions between registering listeners and firing the notification
    - by design handles a situation when a notification is fired more then once
    - it will integrate better with a recently proposed `Input` and 
`SourceReader` interfaces
---
 .../io/network/partition/consumer/InputGate.java   | 38 ++++++++---
 .../partition/consumer/InputGateListener.java      | 35 ----------
 .../partition/consumer/SingleInputGate.java        | 31 ++++-----
 .../network/partition/consumer/UnionInputGate.java | 75 +++++++++++-----------
 .../partition/consumer/InputGateTestBase.java      | 25 ++++++++
 .../partition/consumer/SingleInputGateTest.java    |  9 +++
 .../partition/consumer/TestSingleInputGate.java    | 43 +------------
 .../partition/consumer/UnionInputGateTest.java     | 16 ++++-
 .../runtime/io/BarrierBufferMassiveRandomTest.java |  7 +-
 .../flink/streaming/runtime/io/MockInputGate.java  |  9 +--
 10 files changed, 136 insertions(+), 152 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index c270d37..83e18e9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.event.TaskEvent;
 
 import java.io.IOException;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * An input gate consumes one or more partitions of a single produced 
intermediate result.
@@ -65,33 +66,50 @@ import java.util.Optional;
  * will have an input gate attached to it. This will provide its input, which 
will consist of one
  * subpartition from each partition of the intermediate result.
  */
-public interface InputGate extends AutoCloseable {
+public abstract class InputGate implements AutoCloseable {
 
-       int getNumberOfInputChannels();
+       public static final CompletableFuture<?> AVAILABLE = 
CompletableFuture.completedFuture(null);
 
-       String getOwningTaskName();
+       protected CompletableFuture<?> isAvailable = new CompletableFuture<>();
 
-       boolean isFinished();
+       public abstract int getNumberOfInputChannels();
 
-       void requestPartitions() throws IOException, InterruptedException;
+       public abstract String getOwningTaskName();
+
+       public abstract boolean isFinished();
+
+       public abstract void requestPartitions() throws IOException, 
InterruptedException;
 
        /**
         * Blocking call waiting for next {@link BufferOrEvent}.
         *
         * @return {@code Optional.empty()} if {@link #isFinished()} returns 
true.
         */
-       Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, 
InterruptedException;
+       public abstract Optional<BufferOrEvent> getNextBufferOrEvent() throws 
IOException, InterruptedException;
 
        /**
         * Poll the {@link BufferOrEvent}.
         *
         * @return {@code Optional.empty()} if there is no data to return or if 
{@link #isFinished()} returns true.
         */
-       Optional<BufferOrEvent> pollNextBufferOrEvent() throws IOException, 
InterruptedException;
+       public abstract Optional<BufferOrEvent> pollNextBufferOrEvent() throws 
IOException, InterruptedException;
+
+       public abstract void sendTaskEvent(TaskEvent event) throws IOException;
 
-       void sendTaskEvent(TaskEvent event) throws IOException;
+       public abstract int getPageSize();
 
-       void registerListener(InputGateListener listener);
+       /**
+        * @return a future that is completed if there are more records 
available. If there more records
+        * available immediately, {@link #AVAILABLE} should be returned.
+        */
+       public CompletableFuture<?> isAvailable() {
+               return isAvailable;
+       }
 
-       int getPageSize();
+       protected void resetIsAvailable() {
+               // try to avoid volatile access in isDone()}
+               if (isAvailable == AVAILABLE || isAvailable.isDone()) {
+                       isAvailable = new CompletableFuture<>();
+               }
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateListener.java
deleted file mode 100644
index 00fa782..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateListener.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition.consumer;
-
-/**
- * Listener interface implemented by consumers of {@link InputGate} instances
- * that want to be notified of availability of buffer or event instances.
- */
-public interface InputGateListener {
-
-       /**
-        * Notification callback if the input gate moves from zero to non-zero
-        * available input channels with data.
-        *
-        * @param inputGate Input Gate that became available.
-        */
-       void notifyInputGateNonEmpty(InputGate inputGate);
-
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 750e678..c0830fa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -56,6 +56,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Timer;
+import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -101,7 +102,7 @@ import static 
org.apache.flink.util.Preconditions.checkState;
  * in two partitions (Partition 1 and 2). Each of these partitions is further 
partitioned into two
  * subpartitions -- one for each parallel reduce subtask.
  */
-public class SingleInputGate implements InputGate {
+public class SingleInputGate extends InputGate {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(SingleInputGate.class);
 
@@ -172,9 +173,6 @@ public class SingleInputGate implements InputGate {
        /** Flag indicating whether all resources have been released. */
        private volatile boolean isReleased;
 
-       /** Registered listener to forward buffer notifications to. */
-       private volatile InputGateListener inputGateListener;
-
        private final List<TaskEvent> pendingEvents = new ArrayList<>();
 
        private int numberOfUninitializedChannels;
@@ -547,6 +545,7 @@ public class SingleInputGate implements InputGate {
                                                inputChannelsWithData.wait();
                                        }
                                        else {
+                                               resetIsAvailable();
                                                return Optional.empty();
                                        }
                                }
@@ -563,6 +562,10 @@ public class SingleInputGate implements InputGate {
                                }
 
                                moreAvailable = 
!inputChannelsWithData.isEmpty();
+
+                               if (!moreAvailable) {
+                                       resetIsAvailable();
+                               }
                        }
                } while (!result.isPresent());
 
@@ -613,15 +616,6 @@ public class SingleInputGate implements InputGate {
        // Channel notifications
        // 
------------------------------------------------------------------------
 
-       @Override
-       public void registerListener(InputGateListener inputGateListener) {
-               if (this.inputGateListener == null) {
-                       this.inputGateListener = inputGateListener;
-               } else {
-                       throw new IllegalStateException("Multiple listeners");
-               }
-       }
-
        void notifyChannelNonEmpty(InputChannel channel) {
                queueChannel(checkNotNull(channel));
        }
@@ -633,6 +627,8 @@ public class SingleInputGate implements InputGate {
        private void queueChannel(InputChannel channel) {
                int availableChannels;
 
+               CompletableFuture<?> toNotify = null;
+
                synchronized (inputChannelsWithData) {
                        if 
(enqueuedInputChannelsWithData.get(channel.getChannelIndex())) {
                                return;
@@ -644,14 +640,13 @@ public class SingleInputGate implements InputGate {
 
                        if (availableChannels == 0) {
                                inputChannelsWithData.notifyAll();
+                               toNotify = isAvailable;
+                               isAvailable = AVAILABLE;
                        }
                }
 
-               if (availableChannels == 0) {
-                       InputGateListener listener = inputGateListener;
-                       if (listener != null) {
-                               listener.notifyInputGateNonEmpty(this);
-                       }
+               if (toNotify != null) {
+                       toNotify.complete(null);
                }
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 7d457ee..986d1bb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -30,6 +30,7 @@ import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -63,7 +64,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  *
  * <strong>It is NOT possible to recursively union union input gates.</strong>
  */
-public class UnionInputGate implements InputGate, InputGateListener {
+public class UnionInputGate extends InputGate {
 
        /** The input gates to union. */
        private final InputGate[] inputGates;
@@ -79,9 +80,6 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
        /** The total number of input channels across all unioned input gates. 
*/
        private final int totalNumberOfInputChannels;
 
-       /** Registered listener to forward input gate notifications to. */
-       private volatile InputGateListener inputGateListener;
-
        /**
         * A mapping from input gate to (logical) channel index offset. Valid 
channel indexes go from 0
         * (inclusive) to the total number of input channels (exclusive).
@@ -100,20 +98,31 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
 
                int currentNumberOfInputChannels = 0;
 
-               for (InputGate inputGate : inputGates) {
-                       if (inputGate instanceof UnionInputGate) {
-                               // if we want to add support for this, we need 
to implement pollNextBufferOrEvent()
-                               throw new UnsupportedOperationException("Cannot 
union a union of input gates.");
-                       }
+               synchronized (inputGatesWithData) {
+                       for (InputGate inputGate : inputGates) {
+                               if (inputGate instanceof UnionInputGate) {
+                                       // if we want to add support for this, 
we need to implement pollNextBufferOrEvent()
+                                       throw new 
UnsupportedOperationException("Cannot union a union of input gates.");
+                               }
 
-                       // The offset to use for buffer or event instances 
received from this input gate.
-                       inputGateToIndexOffsetMap.put(checkNotNull(inputGate), 
currentNumberOfInputChannels);
-                       inputGatesWithRemainingData.add(inputGate);
+                               // The offset to use for buffer or event 
instances received from this input gate.
+                               
inputGateToIndexOffsetMap.put(checkNotNull(inputGate), 
currentNumberOfInputChannels);
+                               inputGatesWithRemainingData.add(inputGate);
 
-                       currentNumberOfInputChannels += 
inputGate.getNumberOfInputChannels();
+                               currentNumberOfInputChannels += 
inputGate.getNumberOfInputChannels();
 
-                       // Register the union gate as a listener for all input 
gates
-                       inputGate.registerListener(this);
+                               CompletableFuture<?> available = 
inputGate.isAvailable();
+
+                               if (available.isDone()) {
+                                       inputGatesWithData.add(inputGate);
+                               } else {
+                                       available.thenRun(() -> 
queueInputGate(inputGate));
+                               }
+                       }
+
+                       if (!inputGatesWithData.isEmpty()) {
+                               isAvailable = AVAILABLE;
+                       }
                }
 
                this.totalNumberOfInputChannels = currentNumberOfInputChannels;
@@ -209,6 +218,7 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
                                        if (blocking) {
                                                inputGatesWithData.wait();
                                        } else {
+                                               resetIsAvailable();
                                                return Optional.empty();
                                        }
                                }
@@ -223,6 +233,12 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
                                if (bufferOrEvent.isPresent() && 
bufferOrEvent.get().moreAvailable()) {
                                        // enqueue the inputGate at the end to 
avoid starvation
                                        inputGatesWithData.add(inputGate);
+                               } else {
+                                       inputGate.isAvailable().thenRun(() -> 
queueInputGate(inputGate));
+                               }
+
+                               if (inputGatesWithData.isEmpty()) {
+                                       resetIsAvailable();
                                }
 
                                if (bufferOrEvent.isPresent()) {
@@ -255,15 +271,6 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
        }
 
        @Override
-       public void registerListener(InputGateListener listener) {
-               if (this.inputGateListener == null) {
-                       this.inputGateListener = listener;
-               } else {
-                       throw new IllegalStateException("Multiple listeners");
-               }
-       }
-
-       @Override
        public int getPageSize() {
                int pageSize = -1;
                for (InputGate gate : inputGates) {
@@ -280,33 +287,29 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
        public void close() throws IOException {
        }
 
-       @Override
-       public void notifyInputGateNonEmpty(InputGate inputGate) {
-               queueInputGate(checkNotNull(inputGate));
-       }
-
        private void queueInputGate(InputGate inputGate) {
-               int availableInputGates;
+               checkNotNull(inputGate);
+
+               CompletableFuture<?> toNotify = null;
 
                synchronized (inputGatesWithData) {
                        if (inputGatesWithData.contains(inputGate)) {
                                return;
                        }
 
-                       availableInputGates = inputGatesWithData.size();
+                       int availableInputGates = inputGatesWithData.size();
 
                        inputGatesWithData.add(inputGate);
 
                        if (availableInputGates == 0) {
                                inputGatesWithData.notifyAll();
+                               toNotify = isAvailable;
+                               isAvailable = AVAILABLE;
                        }
                }
 
-               if (availableInputGates == 0) {
-                       InputGateListener listener = inputGateListener;
-                       if (listener != null) {
-                               listener.notifyInputGateNonEmpty(this);
-                       }
+               if (toNotify != null) {
+                       toNotify.complete(null);
                }
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java
index 2214887..ddf027c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java
@@ -27,9 +27,12 @@ import org.junit.runners.Parameterized.Parameters;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test base for {@link InputGate}.
@@ -45,6 +48,28 @@ public abstract class InputGateTestBase {
                return Arrays.asList(Boolean.TRUE, Boolean.FALSE);
        }
 
+       protected void testIsAvailable(
+                       InputGate inputGateToTest,
+                       SingleInputGate inputGateToNotify,
+                       TestInputChannel inputChannelWithNewData) throws 
Exception {
+
+               assertFalse(inputGateToTest.isAvailable().isDone());
+               
assertFalse(inputGateToTest.pollNextBufferOrEvent().isPresent());
+
+               CompletableFuture<?> isAvailable = 
inputGateToTest.isAvailable();
+
+               assertFalse(inputGateToTest.isAvailable().isDone());
+               
assertFalse(inputGateToTest.pollNextBufferOrEvent().isPresent());
+
+               assertEquals(isAvailable, inputGateToTest.isAvailable());
+
+               inputChannelWithNewData.readBuffer();
+               
inputGateToNotify.notifyChannelNonEmpty(inputChannelWithNewData);
+
+               assertTrue(isAvailable.isDone());
+               assertTrue(inputGateToTest.isAvailable().isDone());
+       }
+
        protected SingleInputGate createInputGate() {
                return createInputGate(2);
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 6a5415a..a6f824d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -116,6 +116,15 @@ public class SingleInputGateTest extends InputGateTestBase 
{
                assertTrue(inputGate.isFinished());
        }
 
+       @Test
+       public void testIsAvailable() throws Exception {
+               final SingleInputGate inputGate = createInputGate(1);
+               TestInputChannel inputChannel = new TestInputChannel(inputGate, 
0);
+               inputGate.setInputChannel(new IntermediateResultPartitionID(), 
inputChannel);
+
+               testIsAvailable(inputGate, inputGate, inputChannel);
+       }
+
        @Test(timeout = 120 * 1000)
        public void testIsMoreAvailableReadingFromSingleInputChannel() throws 
Exception {
                // Setup
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index 464ab7c..8404663 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -20,16 +20,8 @@ package 
org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.lang.reflect.Field;
-import java.util.ArrayDeque;
-
 import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
 
 /**
@@ -44,39 +36,8 @@ public class TestSingleInputGate {
        public TestSingleInputGate(int numberOfInputChannels, boolean 
initialize) {
                checkArgument(numberOfInputChannels >= 1);
 
-               SingleInputGate realGate = 
createSingleInputGate(numberOfInputChannels);
-
-               this.inputGate = spy(realGate);
-
-               // Notify about late registrations (added for 
DataSinkTaskTest#testUnionDataSinkTask).
-               // After merging registerInputOutput and invoke, we have to 
make sure that the test
-               // notifications happen at the expected time. In real programs, 
this is guaranteed by
-               // the instantiation and request partition life cycle.
-               try {
-                       Field f = 
realGate.getClass().getDeclaredField("inputChannelsWithData");
-                       f.setAccessible(true);
-                       final ArrayDeque<InputChannel> notifications = 
(ArrayDeque<InputChannel>) f.get(realGate);
-
-                       doAnswer(new Answer<Void>() {
-                               @Override
-                               public Void answer(InvocationOnMock invocation) 
throws Throwable {
-                                       invocation.callRealMethod();
-
-                                       synchronized (notifications) {
-                                               if (!notifications.isEmpty()) {
-                                                       InputGateListener 
listener = (InputGateListener) invocation.getArguments()[0];
-                                                       
listener.notifyInputGateNonEmpty(inputGate);
-                                               }
-                                       }
-
-                                       return null;
-                               }
-                       
}).when(inputGate).registerListener(any(InputGateListener.class));
-               } catch (Exception e) {
-                       throw new RuntimeException(e);
-               }
-
-               this.inputChannels = new 
TestInputChannel[numberOfInputChannels];
+               inputGate = spy(createSingleInputGate(numberOfInputChannels));
+               inputChannels = new TestInputChannel[numberOfInputChannels];
 
                if (initialize) {
                        for (int i = 0; i < numberOfInputChannels; i++) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 448071f..9dca613 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
 import org.junit.Test;
 
-import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateTest.verifyBufferOrEvent;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -101,4 +102,17 @@ public class UnionInputGateTest extends InputGateTestBase {
                assertTrue(union.isFinished());
                assertFalse(union.getNextBufferOrEvent().isPresent());
        }
+
+       @Test
+       public void testIsAvailable() throws Exception {
+               final SingleInputGate inputGate1 = createInputGate(1);
+               TestInputChannel inputChannel1 = new 
TestInputChannel(inputGate1, 0);
+               inputGate1.setInputChannel(new IntermediateResultPartitionID(), 
inputChannel1);
+
+               final SingleInputGate inputGate2 = createInputGate(1);
+               TestInputChannel inputChannel2 = new 
TestInputChannel(inputGate2, 0);
+               inputGate2.setInputChannel(new IntermediateResultPartitionID(), 
inputChannel2);
+
+               testIsAvailable(new UnionInputGate(inputGate1, inputGate2), 
inputGate1, inputChannel1);
+       }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index ded52ff..b1a3ad5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import 
org.apache.flink.runtime.io.network.partition.consumer.InputGateListener;
 
 import org.junit.Test;
 
@@ -130,7 +129,7 @@ public class BarrierBufferMassiveRandomTest {
                }
        }
 
-       private static class RandomGeneratingInputGate implements InputGate {
+       private static class RandomGeneratingInputGate extends InputGate {
 
                private final int numberOfChannels;
                private final BufferPool[] bufferPools;
@@ -151,6 +150,7 @@ public class BarrierBufferMassiveRandomTest {
                        this.bufferPools = bufferPools;
                        this.barrierGens = barrierGens;
                        this.owningTaskName = owningTaskName;
+                       this.isAvailable = AVAILABLE;
                }
 
                @Override
@@ -199,9 +199,6 @@ public class BarrierBufferMassiveRandomTest {
                public void sendTaskEvent(TaskEvent event) {}
 
                @Override
-               public void registerListener(InputGateListener listener) {}
-
-               @Override
                public int getPageSize() {
                        return PAGE_SIZE;
                }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index a29cbf5..30941ab 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -22,7 +22,6 @@ import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import 
org.apache.flink.runtime.io.network.partition.consumer.InputGateListener;
 
 import java.util.ArrayDeque;
 import java.util.List;
@@ -32,7 +31,7 @@ import java.util.Queue;
 /**
  * Mock {@link InputGate}.
  */
-public class MockInputGate implements InputGate {
+public class MockInputGate extends InputGate {
 
        private final int pageSize;
 
@@ -56,6 +55,8 @@ public class MockInputGate implements InputGate {
                this.bufferOrEvents = new 
ArrayDeque<BufferOrEvent>(bufferOrEvents);
                this.closed = new boolean[numberOfChannels];
                this.owningTaskName = owningTaskName;
+
+               isAvailable = AVAILABLE;
        }
 
        @Override
@@ -111,10 +112,6 @@ public class MockInputGate implements InputGate {
        }
 
        @Override
-       public void registerListener(InputGateListener listener) {
-       }
-
-       @Override
        public void close() {
        }
 }

Reply via email to