This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit b399a145cde22dba7a8255b05309daee14e6c1da Author: Arvid Heise <[email protected]> AuthorDate: Mon Jun 15 21:09:13 2020 +0200 [FLINK-18094][network] Add InputGate#getChannelInfos for easier testing. In the following commits, this method will be used to fetch information about all channels without explicitly needing to access the channels. Thus, for tests mocks just need to return meaningful InputChannelInfos instead of actually creating the respective channels. --- .../runtime/io/network/partition/consumer/InputGate.java | 13 +++++++++++++ .../flink/streaming/runtime/io/CheckpointedInputGate.java | 6 ++++++ .../flink/streaming/runtime/io/MockIndexedInputGate.java | 11 +++++++++++ .../apache/flink/streaming/runtime/io/MockInputGate.java | 10 ++++++++++ 4 files changed, 40 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java index 44420b8..0489fde 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java @@ -19,14 +19,18 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; +import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.PullingAsyncDataInput; import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener; import java.io.IOException; +import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -118,6 +122,15 @@ public abstract class InputGate implements PullingAsyncDataInput<BufferOrEvent>, public abstract InputChannel getChannel(int channelIndex); /** + * Returns the channel infos of this gate. + */ + public List<InputChannelInfo> getChannelInfos() { + return IntStream.range(0, getNumberOfInputChannels()) + .mapToObj(index -> getChannel(index).getChannelInfo()) + .collect(Collectors.toList()); + } + + /** * Simple pojo for INPUT, DATA and moreAvailable. */ protected static class InputWithData<INPUT, DATA> { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java index 65fa098..9428515 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; +import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.io.PullingAsyncDataInput; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; @@ -34,6 +35,7 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; +import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -231,6 +233,10 @@ public class CheckpointedInputGate implements PullingAsyncDataInput<BufferOrEven return inputGate.getChannel(channelIndex); } + public List<InputChannelInfo> getChannelInfos() { + return inputGate.getChannelInfos(); + } + @VisibleForTesting CheckpointBarrierHandler getCheckpointBarrierHandler() { return barrierHandler; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java index a04ec55..3ec99a8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java @@ -19,15 +19,19 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; +import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate; import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; +import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; +import java.util.stream.IntStream; /** * Mock {@link IndexedInputGate}. @@ -73,6 +77,13 @@ public class MockIndexedInputGate extends IndexedInputGate { } @Override + public List<InputChannelInfo> getChannelInfos() { + return IntStream.range(0, numberOfInputChannels) + .mapToObj(channelIndex -> new InputChannelInfo(gateIndex, channelIndex)) + .collect(Collectors.toList()); + } + + @Override public boolean isFinished() { return false; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java index d0e958a..a536cbd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; +import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener; @@ -33,6 +34,8 @@ import java.util.Optional; import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; +import java.util.stream.IntStream; /** * Mock {@link InputGate}. @@ -89,6 +92,13 @@ public class MockInputGate extends InputGate { } @Override + public List<InputChannelInfo> getChannelInfos() { + return IntStream.range(0, numberOfChannels) + .mapToObj(channelIndex -> new InputChannelInfo(0, channelIndex)) + .collect(Collectors.toList()); + } + + @Override public boolean isFinished() { return finishAfterLastBuffer && bufferOrEvents.isEmpty(); }
