This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a8b564944003eca2377358e4031d65c987347ade
Author: Yun Gao <[email protected]>
AuthorDate: Tue Jun 15 17:34:24 2021 +0800

    [FLINK-21085][runtime] SingleInputGate supports acquiring unfinished 
channels
---
 .../partition/consumer/IndexedInputGate.java       |  4 ++
 .../partition/consumer/SingleInputGate.java        | 16 ++++++++
 .../runtime/taskmanager/InputGateWithMetrics.java  |  6 +++
 .../partition/consumer/SingleInputGateTest.java    | 44 ++++++++++++++++++++++
 .../streaming/runtime/io/MockIndexedInputGate.java |  6 +++
 .../flink/streaming/runtime/io/MockInputGate.java  |  6 +++
 .../AlignedCheckpointsMassiveRandomTest.java       |  6 +++
 7 files changed, 88 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
index be837f4..9ac4be7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
@@ -22,12 +22,16 @@ import 
org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import java.io.IOException;
+import java.util.List;
 
 /** An {@link InputGate} with a specific index. */
 public abstract class IndexedInputGate extends InputGate implements 
CheckpointableInput {
     /** Returns the index of this input gate. Only supported on */
     public abstract int getGateIndex();
 
+    /** Returns the list of channels that have not received 
EndOfPartitionEvent. */
+    public abstract List<InputChannelInfo> getUnfinishedChannels();
+
     @Override
     public void checkpointStarted(CheckpointBarrier barrier) throws 
CheckpointException {
         for (int index = 0, numChannels = getNumberOfInputChannels();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 03a2a50..4e3bad6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -350,6 +350,22 @@ public class SingleInputGate extends IndexedInputGate {
         return gateIndex;
     }
 
+    @Override
+    public List<InputChannelInfo> getUnfinishedChannels() {
+        List<InputChannelInfo> unfinishedChannels =
+                new ArrayList<>(
+                        numberOfInputChannels - 
channelsWithEndOfPartitionEvents.cardinality());
+        synchronized (inputChannelsWithData) {
+            for (int i = channelsWithEndOfPartitionEvents.nextClearBit(0);
+                    i < numberOfInputChannels;
+                    i = channelsWithEndOfPartitionEvents.nextClearBit(i + 1)) {
+                unfinishedChannels.add(getChannel(i).getChannelInfo());
+            }
+        }
+
+        return unfinishedChannels;
+    }
+
     /**
      * Returns the type of this input channel's consumed result partition.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
index cd9f91c..d6920dc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
@@ -75,6 +76,11 @@ public class InputGateWithMetrics extends IndexedInputGate {
     }
 
     @Override
+    public List<InputChannelInfo> getUnfinishedChannels() {
+        return inputGate.getUnfinishedChannels();
+    }
+
+    @Override
     public boolean isFinished() {
         return inputGate.isFinished();
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 75d694a..36df101 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -67,6 +67,8 @@ import org.junit.Test;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -885,6 +887,48 @@ public class SingleInputGateTest extends InputGateTestBase 
{
         }
     }
 
+    @Test
+    public void testGetUnfinishedChannels() throws IOException, 
InterruptedException {
+        SingleInputGate inputGate =
+                new SingleInputGateBuilder()
+                        .setSingleInputGateIndex(1)
+                        .setNumberOfChannels(3)
+                        .build();
+        final TestInputChannel[] inputChannels =
+                new TestInputChannel[] {
+                    new TestInputChannel(inputGate, 0),
+                    new TestInputChannel(inputGate, 1),
+                    new TestInputChannel(inputGate, 2)
+                };
+        inputGate.setInputChannels(inputChannels);
+
+        assertEquals(
+                Arrays.asList(
+                        inputChannels[0].getChannelInfo(),
+                        inputChannels[1].getChannelInfo(),
+                        inputChannels[2].getChannelInfo()),
+                inputGate.getUnfinishedChannels());
+
+        inputChannels[1].readEndOfPartitionEvent();
+        inputGate.notifyChannelNonEmpty(inputChannels[1]);
+        inputGate.getNext();
+        assertEquals(
+                Arrays.asList(inputChannels[0].getChannelInfo(), 
inputChannels[2].getChannelInfo()),
+                inputGate.getUnfinishedChannels());
+
+        inputChannels[0].readEndOfPartitionEvent();
+        inputGate.notifyChannelNonEmpty(inputChannels[0]);
+        inputGate.getNext();
+        assertEquals(
+                Collections.singletonList(inputChannels[2].getChannelInfo()),
+                inputGate.getUnfinishedChannels());
+
+        inputChannels[2].readEndOfPartitionEvent();
+        inputGate.notifyChannelNonEmpty(inputChannels[2]);
+        inputGate.getNext();
+        assertEquals(Collections.emptyList(), 
inputGate.getUnfinishedChannels());
+    }
+
     // 
---------------------------------------------------------------------------------------------
 
     private static Map<InputGateID, SingleInputGate> 
createInputGateWithLocalChannels(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
index 368db2a..b11580e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -107,4 +108,9 @@ public class MockIndexedInputGate extends IndexedInputGate {
     public int getGateIndex() {
         return gateIndex;
     }
+
+    @Override
+    public List<InputChannelInfo> getUnfinishedChannels() {
+        return Collections.emptyList();
+    }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index 8a94c2f..9346be6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
@@ -166,4 +167,9 @@ public class MockInputGate extends IndexedInputGate {
     public int getGateIndex() {
         return 0;
     }
+
+    @Override
+    public List<InputChannelInfo> getUnfinishedChannels() {
+        return Collections.emptyList();
+    }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java
index 532e0eb..16b4acb 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java
@@ -35,6 +35,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Random;
@@ -254,5 +255,10 @@ public class AlignedCheckpointsMassiveRandomTest {
         public int getGateIndex() {
             return 0;
         }
+
+        @Override
+        public List<InputChannelInfo> getUnfinishedChannels() {
+            return Collections.emptyList();
+        }
     }
 }

Reply via email to