This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a8b564944003eca2377358e4031d65c987347ade Author: Yun Gao <[email protected]> AuthorDate: Tue Jun 15 17:34:24 2021 +0800 [FLINK-21085][runtime] SingleInputGate supports acquiring unfinished channels --- .../partition/consumer/IndexedInputGate.java | 4 ++ .../partition/consumer/SingleInputGate.java | 16 ++++++++ .../runtime/taskmanager/InputGateWithMetrics.java | 6 +++ .../partition/consumer/SingleInputGateTest.java | 44 ++++++++++++++++++++++ .../streaming/runtime/io/MockIndexedInputGate.java | 6 +++ .../flink/streaming/runtime/io/MockInputGate.java | 6 +++ .../AlignedCheckpointsMassiveRandomTest.java | 6 +++ 7 files changed, 88 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java index be837f4..9ac4be7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java @@ -22,12 +22,16 @@ import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import java.io.IOException; +import java.util.List; /** An {@link InputGate} with a specific index. */ public abstract class IndexedInputGate extends InputGate implements CheckpointableInput { /** Returns the index of this input gate. Only supported on */ public abstract int getGateIndex(); + /** Returns the list of channels that have not received EndOfPartitionEvent. */ + public abstract List<InputChannelInfo> getUnfinishedChannels(); + @Override public void checkpointStarted(CheckpointBarrier barrier) throws CheckpointException { for (int index = 0, numChannels = getNumberOfInputChannels(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 03a2a50..4e3bad6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -350,6 +350,22 @@ public class SingleInputGate extends IndexedInputGate { return gateIndex; } + @Override + public List<InputChannelInfo> getUnfinishedChannels() { + List<InputChannelInfo> unfinishedChannels = + new ArrayList<>( + numberOfInputChannels - channelsWithEndOfPartitionEvents.cardinality()); + synchronized (inputChannelsWithData) { + for (int i = channelsWithEndOfPartitionEvents.nextClearBit(0); + i < numberOfInputChannels; + i = channelsWithEndOfPartitionEvents.nextClearBit(i + 1)) { + unfinishedChannels.add(getChannel(i).getChannelInfo()); + } + } + + return unfinishedChannels; + } + /** * Returns the type of this input channel's consumed result partition. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java index cd9f91c..d6920dc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import java.io.IOException; +import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -75,6 +76,11 @@ public class InputGateWithMetrics extends IndexedInputGate { } @Override + public List<InputChannelInfo> getUnfinishedChannels() { + return inputGate.getUnfinishedChannels(); + } + + @Override public boolean isFinished() { return inputGate.isFinished(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index 75d694a..36df101 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -67,6 +67,8 @@ import org.junit.Test; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -885,6 +887,48 @@ public class SingleInputGateTest extends InputGateTestBase { } } + @Test + public void testGetUnfinishedChannels() throws IOException, InterruptedException { + SingleInputGate inputGate = + new SingleInputGateBuilder() + .setSingleInputGateIndex(1) + .setNumberOfChannels(3) + .build(); + final TestInputChannel[] inputChannels = + new TestInputChannel[] { + new TestInputChannel(inputGate, 0), + new TestInputChannel(inputGate, 1), + new TestInputChannel(inputGate, 2) + }; + inputGate.setInputChannels(inputChannels); + + assertEquals( + Arrays.asList( + inputChannels[0].getChannelInfo(), + inputChannels[1].getChannelInfo(), + inputChannels[2].getChannelInfo()), + inputGate.getUnfinishedChannels()); + + inputChannels[1].readEndOfPartitionEvent(); + inputGate.notifyChannelNonEmpty(inputChannels[1]); + inputGate.getNext(); + assertEquals( + Arrays.asList(inputChannels[0].getChannelInfo(), inputChannels[2].getChannelInfo()), + inputGate.getUnfinishedChannels()); + + inputChannels[0].readEndOfPartitionEvent(); + inputGate.notifyChannelNonEmpty(inputChannels[0]); + inputGate.getNext(); + assertEquals( + Collections.singletonList(inputChannels[2].getChannelInfo()), + inputGate.getUnfinishedChannels()); + + inputChannels[2].readEndOfPartitionEvent(); + inputGate.notifyChannelNonEmpty(inputChannels[2]); + inputGate.getNext(); + assertEquals(Collections.emptyList(), inputGate.getUnfinishedChannels()); + } + // --------------------------------------------------------------------------------------------- private static Map<InputGateID, SingleInputGate> createInputGateWithLocalChannels( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java index 368db2a..b11580e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate; import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -107,4 +108,9 @@ public class MockIndexedInputGate extends IndexedInputGate { public int getGateIndex() { return gateIndex; } + + @Override + public List<InputChannelInfo> getUnfinishedChannels() { + return Collections.emptyList(); + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java index 8a94c2f..9346be6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Optional; @@ -166,4 +167,9 @@ public class MockInputGate extends IndexedInputGate { public int getGateIndex() { return 0; } + + @Override + public List<InputChannelInfo> getUnfinishedChannels() { + return Collections.emptyList(); + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java index 532e0eb..16b4acb 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java @@ -35,6 +35,7 @@ import org.junit.Test; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.Random; @@ -254,5 +255,10 @@ public class AlignedCheckpointsMassiveRandomTest { public int getGateIndex() { return 0; } + + @Override + public List<InputChannelInfo> getUnfinishedChannels() { + return Collections.emptyList(); + } } }
