This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b399a145cde22dba7a8255b05309daee14e6c1da
Author: Arvid Heise <[email protected]>
AuthorDate: Mon Jun 15 21:09:13 2020 +0200

    [FLINK-18094][network] Add InputGate#getChannelInfos for easier testing.
    
    In the following commits, this method will be used to fetch information 
about all channels without explicitly needing to access the channels. Thus, for 
tests mocks just need to return meaningful InputChannelInfos instead of 
actually creating the respective channels.
---
 .../runtime/io/network/partition/consumer/InputGate.java    | 13 +++++++++++++
 .../flink/streaming/runtime/io/CheckpointedInputGate.java   |  6 ++++++
 .../flink/streaming/runtime/io/MockIndexedInputGate.java    | 11 +++++++++++
 .../apache/flink/streaming/runtime/io/MockInputGate.java    | 10 ++++++++++
 4 files changed, 40 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 44420b8..0489fde 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -19,14 +19,18 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.PullingAsyncDataInput;
 import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -118,6 +122,15 @@ public abstract class InputGate implements 
PullingAsyncDataInput<BufferOrEvent>,
        public abstract InputChannel getChannel(int channelIndex);
 
        /**
+        * Returns the channel infos of this gate.
+        */
+       public List<InputChannelInfo> getChannelInfos() {
+               return IntStream.range(0, getNumberOfInputChannels())
+                       .mapToObj(index -> getChannel(index).getChannelInfo())
+                       .collect(Collectors.toList());
+       }
+
+       /**
         * Simple pojo for INPUT, DATA and moreAvailable.
         */
        protected static class InputWithData<INPUT, DATA> {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
index 65fa098..9428515 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.io.PullingAsyncDataInput;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -34,6 +35,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
@@ -231,6 +233,10 @@ public class CheckpointedInputGate implements 
PullingAsyncDataInput<BufferOrEven
                return inputGate.getChannel(channelIndex);
        }
 
+       public List<InputChannelInfo> getChannelInfos() {
+               return inputGate.getChannelInfos();
+       }
+
        @VisibleForTesting
        CheckpointBarrierHandler getCheckpointBarrierHandler() {
                return barrierHandler;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
index a04ec55..3ec99a8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
@@ -19,15 +19,19 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /**
  * Mock {@link IndexedInputGate}.
@@ -73,6 +77,13 @@ public class MockIndexedInputGate extends IndexedInputGate {
        }
 
        @Override
+       public List<InputChannelInfo> getChannelInfos() {
+               return IntStream.range(0, numberOfInputChannels)
+                               .mapToObj(channelIndex -> new 
InputChannelInfo(gateIndex, channelIndex))
+                               .collect(Collectors.toList());
+       }
+
+       @Override
        public boolean isFinished() {
                return false;
        }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index d0e958a..a536cbd 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;
@@ -33,6 +34,8 @@ import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /**
  * Mock {@link InputGate}.
@@ -89,6 +92,13 @@ public class MockInputGate extends InputGate {
        }
 
        @Override
+       public List<InputChannelInfo> getChannelInfos() {
+               return IntStream.range(0, numberOfChannels)
+                       .mapToObj(channelIndex -> new InputChannelInfo(0, 
channelIndex))
+                       .collect(Collectors.toList());
+       }
+
+       @Override
        public boolean isFinished() {
                return finishAfterLastBuffer && bufferOrEvents.isEmpty();
        }

Reply via email to