This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4527d05c810fed6329bf0789e27cc43bd816e754 Author: Piotr Nowojski <[email protected]> AuthorDate: Tue May 7 14:05:47 2019 +0200 [FLINK-12434][network] Replace listeners with CompletableFuture in InputGates This commit replaces listeners in InputGates with `CompletableFuture<?> isAvailable`. Such change makes the contract simpler, since it avoids problems with: - only one possible listener - race conditions between registering listeners and firing the notification - by design handles a situation when a notification is fired more then once - it will integrate better with a recently proposed `Input` and `SourceReader` interfaces --- .../io/network/partition/consumer/InputGate.java | 38 ++++++++--- .../partition/consumer/InputGateListener.java | 35 ---------- .../partition/consumer/SingleInputGate.java | 31 ++++----- .../network/partition/consumer/UnionInputGate.java | 75 +++++++++++----------- .../partition/consumer/InputGateTestBase.java | 25 ++++++++ .../partition/consumer/SingleInputGateTest.java | 9 +++ .../partition/consumer/TestSingleInputGate.java | 43 +------------ .../partition/consumer/UnionInputGateTest.java | 16 ++++- .../runtime/io/BarrierBufferMassiveRandomTest.java | 7 +- .../flink/streaming/runtime/io/MockInputGate.java | 9 +-- 10 files changed, 136 insertions(+), 152 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java index c270d37..83e18e9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.event.TaskEvent; import java.io.IOException; import java.util.Optional; +import java.util.concurrent.CompletableFuture; /** * An input gate consumes one or more partitions of a single produced intermediate result. @@ -65,33 +66,50 @@ import java.util.Optional; * will have an input gate attached to it. This will provide its input, which will consist of one * subpartition from each partition of the intermediate result. */ -public interface InputGate extends AutoCloseable { +public abstract class InputGate implements AutoCloseable { - int getNumberOfInputChannels(); + public static final CompletableFuture<?> AVAILABLE = CompletableFuture.completedFuture(null); - String getOwningTaskName(); + protected CompletableFuture<?> isAvailable = new CompletableFuture<>(); - boolean isFinished(); + public abstract int getNumberOfInputChannels(); - void requestPartitions() throws IOException, InterruptedException; + public abstract String getOwningTaskName(); + + public abstract boolean isFinished(); + + public abstract void requestPartitions() throws IOException, InterruptedException; /** * Blocking call waiting for next {@link BufferOrEvent}. * * @return {@code Optional.empty()} if {@link #isFinished()} returns true. */ - Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, InterruptedException; + public abstract Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, InterruptedException; /** * Poll the {@link BufferOrEvent}. * * @return {@code Optional.empty()} if there is no data to return or if {@link #isFinished()} returns true. */ - Optional<BufferOrEvent> pollNextBufferOrEvent() throws IOException, InterruptedException; + public abstract Optional<BufferOrEvent> pollNextBufferOrEvent() throws IOException, InterruptedException; + + public abstract void sendTaskEvent(TaskEvent event) throws IOException; - void sendTaskEvent(TaskEvent event) throws IOException; + public abstract int getPageSize(); - void registerListener(InputGateListener listener); + /** + * @return a future that is completed if there are more records available. If there more records + * available immediately, {@link #AVAILABLE} should be returned. + */ + public CompletableFuture<?> isAvailable() { + return isAvailable; + } - int getPageSize(); + protected void resetIsAvailable() { + // try to avoid volatile access in isDone()} + if (isAvailable == AVAILABLE || isAvailable.isDone()) { + isAvailable = new CompletableFuture<>(); + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateListener.java deleted file mode 100644 index 00fa782..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateListener.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.partition.consumer; - -/** - * Listener interface implemented by consumers of {@link InputGate} instances - * that want to be notified of availability of buffer or event instances. - */ -public interface InputGateListener { - - /** - * Notification callback if the input gate moves from zero to non-zero - * available input channels with data. - * - * @param inputGate Input Gate that became available. - */ - void notifyInputGateNonEmpty(InputGate inputGate); - -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 750e678..c0830fa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -56,6 +56,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Timer; +import java.util.concurrent.CompletableFuture; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -101,7 +102,7 @@ import static org.apache.flink.util.Preconditions.checkState; * in two partitions (Partition 1 and 2). Each of these partitions is further partitioned into two * subpartitions -- one for each parallel reduce subtask. */ -public class SingleInputGate implements InputGate { +public class SingleInputGate extends InputGate { private static final Logger LOG = LoggerFactory.getLogger(SingleInputGate.class); @@ -172,9 +173,6 @@ public class SingleInputGate implements InputGate { /** Flag indicating whether all resources have been released. */ private volatile boolean isReleased; - /** Registered listener to forward buffer notifications to. */ - private volatile InputGateListener inputGateListener; - private final List<TaskEvent> pendingEvents = new ArrayList<>(); private int numberOfUninitializedChannels; @@ -547,6 +545,7 @@ public class SingleInputGate implements InputGate { inputChannelsWithData.wait(); } else { + resetIsAvailable(); return Optional.empty(); } } @@ -563,6 +562,10 @@ public class SingleInputGate implements InputGate { } moreAvailable = !inputChannelsWithData.isEmpty(); + + if (!moreAvailable) { + resetIsAvailable(); + } } } while (!result.isPresent()); @@ -613,15 +616,6 @@ public class SingleInputGate implements InputGate { // Channel notifications // ------------------------------------------------------------------------ - @Override - public void registerListener(InputGateListener inputGateListener) { - if (this.inputGateListener == null) { - this.inputGateListener = inputGateListener; - } else { - throw new IllegalStateException("Multiple listeners"); - } - } - void notifyChannelNonEmpty(InputChannel channel) { queueChannel(checkNotNull(channel)); } @@ -633,6 +627,8 @@ public class SingleInputGate implements InputGate { private void queueChannel(InputChannel channel) { int availableChannels; + CompletableFuture<?> toNotify = null; + synchronized (inputChannelsWithData) { if (enqueuedInputChannelsWithData.get(channel.getChannelIndex())) { return; @@ -644,14 +640,13 @@ public class SingleInputGate implements InputGate { if (availableChannels == 0) { inputChannelsWithData.notifyAll(); + toNotify = isAvailable; + isAvailable = AVAILABLE; } } - if (availableChannels == 0) { - InputGateListener listener = inputGateListener; - if (listener != null) { - listener.notifyInputGateNonEmpty(this); - } + if (toNotify != null) { + toNotify.complete(null); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java index 7d457ee..986d1bb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java @@ -30,6 +30,7 @@ import java.util.LinkedHashSet; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -63,7 +64,7 @@ import static org.apache.flink.util.Preconditions.checkState; * * <strong>It is NOT possible to recursively union union input gates.</strong> */ -public class UnionInputGate implements InputGate, InputGateListener { +public class UnionInputGate extends InputGate { /** The input gates to union. */ private final InputGate[] inputGates; @@ -79,9 +80,6 @@ public class UnionInputGate implements InputGate, InputGateListener { /** The total number of input channels across all unioned input gates. */ private final int totalNumberOfInputChannels; - /** Registered listener to forward input gate notifications to. */ - private volatile InputGateListener inputGateListener; - /** * A mapping from input gate to (logical) channel index offset. Valid channel indexes go from 0 * (inclusive) to the total number of input channels (exclusive). @@ -100,20 +98,31 @@ public class UnionInputGate implements InputGate, InputGateListener { int currentNumberOfInputChannels = 0; - for (InputGate inputGate : inputGates) { - if (inputGate instanceof UnionInputGate) { - // if we want to add support for this, we need to implement pollNextBufferOrEvent() - throw new UnsupportedOperationException("Cannot union a union of input gates."); - } + synchronized (inputGatesWithData) { + for (InputGate inputGate : inputGates) { + if (inputGate instanceof UnionInputGate) { + // if we want to add support for this, we need to implement pollNextBufferOrEvent() + throw new UnsupportedOperationException("Cannot union a union of input gates."); + } - // The offset to use for buffer or event instances received from this input gate. - inputGateToIndexOffsetMap.put(checkNotNull(inputGate), currentNumberOfInputChannels); - inputGatesWithRemainingData.add(inputGate); + // The offset to use for buffer or event instances received from this input gate. + inputGateToIndexOffsetMap.put(checkNotNull(inputGate), currentNumberOfInputChannels); + inputGatesWithRemainingData.add(inputGate); - currentNumberOfInputChannels += inputGate.getNumberOfInputChannels(); + currentNumberOfInputChannels += inputGate.getNumberOfInputChannels(); - // Register the union gate as a listener for all input gates - inputGate.registerListener(this); + CompletableFuture<?> available = inputGate.isAvailable(); + + if (available.isDone()) { + inputGatesWithData.add(inputGate); + } else { + available.thenRun(() -> queueInputGate(inputGate)); + } + } + + if (!inputGatesWithData.isEmpty()) { + isAvailable = AVAILABLE; + } } this.totalNumberOfInputChannels = currentNumberOfInputChannels; @@ -209,6 +218,7 @@ public class UnionInputGate implements InputGate, InputGateListener { if (blocking) { inputGatesWithData.wait(); } else { + resetIsAvailable(); return Optional.empty(); } } @@ -223,6 +233,12 @@ public class UnionInputGate implements InputGate, InputGateListener { if (bufferOrEvent.isPresent() && bufferOrEvent.get().moreAvailable()) { // enqueue the inputGate at the end to avoid starvation inputGatesWithData.add(inputGate); + } else { + inputGate.isAvailable().thenRun(() -> queueInputGate(inputGate)); + } + + if (inputGatesWithData.isEmpty()) { + resetIsAvailable(); } if (bufferOrEvent.isPresent()) { @@ -255,15 +271,6 @@ public class UnionInputGate implements InputGate, InputGateListener { } @Override - public void registerListener(InputGateListener listener) { - if (this.inputGateListener == null) { - this.inputGateListener = listener; - } else { - throw new IllegalStateException("Multiple listeners"); - } - } - - @Override public int getPageSize() { int pageSize = -1; for (InputGate gate : inputGates) { @@ -280,33 +287,29 @@ public class UnionInputGate implements InputGate, InputGateListener { public void close() throws IOException { } - @Override - public void notifyInputGateNonEmpty(InputGate inputGate) { - queueInputGate(checkNotNull(inputGate)); - } - private void queueInputGate(InputGate inputGate) { - int availableInputGates; + checkNotNull(inputGate); + + CompletableFuture<?> toNotify = null; synchronized (inputGatesWithData) { if (inputGatesWithData.contains(inputGate)) { return; } - availableInputGates = inputGatesWithData.size(); + int availableInputGates = inputGatesWithData.size(); inputGatesWithData.add(inputGate); if (availableInputGates == 0) { inputGatesWithData.notifyAll(); + toNotify = isAvailable; + isAvailable = AVAILABLE; } } - if (availableInputGates == 0) { - InputGateListener listener = inputGateListener; - if (listener != null) { - listener.notifyInputGateNonEmpty(this); - } + if (toNotify != null) { + toNotify.complete(null); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java index 2214887..ddf027c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java @@ -27,9 +27,12 @@ import org.junit.runners.Parameterized.Parameters; import java.util.Arrays; import java.util.List; +import java.util.concurrent.CompletableFuture; import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; /** * Test base for {@link InputGate}. @@ -45,6 +48,28 @@ public abstract class InputGateTestBase { return Arrays.asList(Boolean.TRUE, Boolean.FALSE); } + protected void testIsAvailable( + InputGate inputGateToTest, + SingleInputGate inputGateToNotify, + TestInputChannel inputChannelWithNewData) throws Exception { + + assertFalse(inputGateToTest.isAvailable().isDone()); + assertFalse(inputGateToTest.pollNextBufferOrEvent().isPresent()); + + CompletableFuture<?> isAvailable = inputGateToTest.isAvailable(); + + assertFalse(inputGateToTest.isAvailable().isDone()); + assertFalse(inputGateToTest.pollNextBufferOrEvent().isPresent()); + + assertEquals(isAvailable, inputGateToTest.isAvailable()); + + inputChannelWithNewData.readBuffer(); + inputGateToNotify.notifyChannelNonEmpty(inputChannelWithNewData); + + assertTrue(isAvailable.isDone()); + assertTrue(inputGateToTest.isAvailable().isDone()); + } + protected SingleInputGate createInputGate() { return createInputGate(2); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index 6a5415a..a6f824d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -116,6 +116,15 @@ public class SingleInputGateTest extends InputGateTestBase { assertTrue(inputGate.isFinished()); } + @Test + public void testIsAvailable() throws Exception { + final SingleInputGate inputGate = createInputGate(1); + TestInputChannel inputChannel = new TestInputChannel(inputGate, 0); + inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannel); + + testIsAvailable(inputGate, inputGate, inputChannel); + } + @Test(timeout = 120 * 1000) public void testIsMoreAvailableReadingFromSingleInputChannel() throws Exception { // Setup diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java index 464ab7c..8404663 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java @@ -20,16 +20,8 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import java.lang.reflect.Field; -import java.util.ArrayDeque; - import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; import static org.apache.flink.util.Preconditions.checkArgument; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; /** @@ -44,39 +36,8 @@ public class TestSingleInputGate { public TestSingleInputGate(int numberOfInputChannels, boolean initialize) { checkArgument(numberOfInputChannels >= 1); - SingleInputGate realGate = createSingleInputGate(numberOfInputChannels); - - this.inputGate = spy(realGate); - - // Notify about late registrations (added for DataSinkTaskTest#testUnionDataSinkTask). - // After merging registerInputOutput and invoke, we have to make sure that the test - // notifications happen at the expected time. In real programs, this is guaranteed by - // the instantiation and request partition life cycle. - try { - Field f = realGate.getClass().getDeclaredField("inputChannelsWithData"); - f.setAccessible(true); - final ArrayDeque<InputChannel> notifications = (ArrayDeque<InputChannel>) f.get(realGate); - - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - invocation.callRealMethod(); - - synchronized (notifications) { - if (!notifications.isEmpty()) { - InputGateListener listener = (InputGateListener) invocation.getArguments()[0]; - listener.notifyInputGateNonEmpty(inputGate); - } - } - - return null; - } - }).when(inputGate).registerListener(any(InputGateListener.class)); - } catch (Exception e) { - throw new RuntimeException(e); - } - - this.inputChannels = new TestInputChannel[numberOfInputChannels]; + inputGate = spy(createSingleInputGate(numberOfInputChannels)); + inputChannels = new TestInputChannel[numberOfInputChannels]; if (initialize) { for (int i = 0; i < numberOfInputChannels; i++) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java index 448071f..9dca613 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java @@ -18,9 +18,10 @@ package org.apache.flink.runtime.io.network.partition.consumer; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; + import org.junit.Test; -import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; import static org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateTest.verifyBufferOrEvent; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -101,4 +102,17 @@ public class UnionInputGateTest extends InputGateTestBase { assertTrue(union.isFinished()); assertFalse(union.getNextBufferOrEvent().isPresent()); } + + @Test + public void testIsAvailable() throws Exception { + final SingleInputGate inputGate1 = createInputGate(1); + TestInputChannel inputChannel1 = new TestInputChannel(inputGate1, 0); + inputGate1.setInputChannel(new IntermediateResultPartitionID(), inputChannel1); + + final SingleInputGate inputGate2 = createInputGate(1); + TestInputChannel inputChannel2 = new TestInputChannel(inputGate2, 0); + inputGate2.setInputChannel(new IntermediateResultPartitionID(), inputChannel2); + + testIsAvailable(new UnionInputGate(inputGate1, inputGate2), inputGate1, inputChannel1); + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java index ded52ff..b1a3ad5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java @@ -27,7 +27,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; -import org.apache.flink.runtime.io.network.partition.consumer.InputGateListener; import org.junit.Test; @@ -130,7 +129,7 @@ public class BarrierBufferMassiveRandomTest { } } - private static class RandomGeneratingInputGate implements InputGate { + private static class RandomGeneratingInputGate extends InputGate { private final int numberOfChannels; private final BufferPool[] bufferPools; @@ -151,6 +150,7 @@ public class BarrierBufferMassiveRandomTest { this.bufferPools = bufferPools; this.barrierGens = barrierGens; this.owningTaskName = owningTaskName; + this.isAvailable = AVAILABLE; } @Override @@ -199,9 +199,6 @@ public class BarrierBufferMassiveRandomTest { public void sendTaskEvent(TaskEvent event) {} @Override - public void registerListener(InputGateListener listener) {} - - @Override public int getPageSize() { return PAGE_SIZE; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java index a29cbf5..30941ab 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java @@ -22,7 +22,6 @@ import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; -import org.apache.flink.runtime.io.network.partition.consumer.InputGateListener; import java.util.ArrayDeque; import java.util.List; @@ -32,7 +31,7 @@ import java.util.Queue; /** * Mock {@link InputGate}. */ -public class MockInputGate implements InputGate { +public class MockInputGate extends InputGate { private final int pageSize; @@ -56,6 +55,8 @@ public class MockInputGate implements InputGate { this.bufferOrEvents = new ArrayDeque<BufferOrEvent>(bufferOrEvents); this.closed = new boolean[numberOfChannels]; this.owningTaskName = owningTaskName; + + isAvailable = AVAILABLE; } @Override @@ -111,10 +112,6 @@ public class MockInputGate implements InputGate { } @Override - public void registerListener(InputGateListener listener) { - } - - @Override public void close() { } }
