rkhachatryan commented on a change in pull request #12575:
URL: https://github.com/apache/flink/pull/12575#discussion_r438059312
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java
##########
@@ -38,18 +40,18 @@ public static InputGate
createInputGate(Collection<IndexedInputGate> inputGates1
List<IndexedInputGate> gates = new
ArrayList<>(inputGates1.size() + inputGates2.size());
gates.addAll(inputGates1);
gates.addAll(inputGates2);
- return createInputGate(gates.toArray(new
IndexedInputGate[gates.size()]));
+ return createInputGate(gates);
}
- public static InputGate createInputGate(IndexedInputGate[] inputGates) {
- if (inputGates.length <= 0) {
+ public static InputGate createInputGate(Collection<IndexedInputGate>
inputGates) {
Review comment:
Replacing an array with `Collection` allows to lose ordering. Why not to
use `List` instead?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
##########
@@ -28,39 +29,28 @@
import java.util.Arrays;
import java.util.Collection;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.stream.IntStream;
/**
* Utility for creating {@link CheckpointedInputGate} based on checkpoint mode
* for {@link StreamOneInputProcessor} and {@link StreamTwoInputProcessor}.
*/
@Internal
public class InputProcessorUtil {
-
+ @SuppressWarnings("unchecked")
public static CheckpointedInputGate createCheckpointedInputGate(
AbstractInvokable toNotifyOnCheckpoint,
StreamConfig config,
SubtaskCheckpointCoordinator checkpointCoordinator,
IndexedInputGate[] inputGates,
TaskIOMetricGroup taskIOMetricGroup,
String taskName) {
- InputGate inputGate = InputGateUtil.createInputGate(inputGates);
- CheckpointBarrierHandler barrierHandler =
createCheckpointBarrierHandler(
+ return createCheckpointedMultipleInputGate(
+ toNotifyOnCheckpoint,
config,
-
Arrays.stream(inputGates).mapToInt(InputGate::getNumberOfInputChannels),
checkpointCoordinator,
+ taskIOMetricGroup,
taskName,
- generateChannelIndexToInputGateMap(inputGate),
- generateInputGateToChannelIndexOffsetMap(inputGate),
- toNotifyOnCheckpoint);
- registerCheckpointMetrics(taskIOMetricGroup, barrierHandler);
-
-
barrierHandler.getBufferReceivedListener().ifPresent(inputGate::registerBufferReceivedListener);
-
- return new CheckpointedInputGate(inputGate, barrierHandler);
+ Arrays.asList(inputGates))[0];
Review comment:
Can we add a check that returned value is a single-element array to
prevent future bugs?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -91,31 +91,25 @@
private final ThreadSafeUnaligner threadSafeUnaligner;
CheckpointBarrierUnaligner(
- int[] numberOfInputChannelsPerGate,
SubtaskCheckpointCoordinator checkpointCoordinator,
String taskName,
- AbstractInvokable toNotifyOnCheckpoint) {
+ AbstractInvokable toNotifyOnCheckpoint,
+ IndexedInputGate... inputGates) {
super(toNotifyOnCheckpoint);
this.taskName = taskName;
-
- final int numGates = numberOfInputChannelsPerGate.length;
-
- gateChannelOffsets = new int[numGates];
- for (int index = 1; index < numGates; index++) {
- gateChannelOffsets[index] = gateChannelOffsets[index -
1] + numberOfInputChannelsPerGate[index - 1];
- }
-
- final int totalNumChannels = gateChannelOffsets[numGates - 1] +
numberOfInputChannelsPerGate[numGates - 1];
- hasInflightBuffers = new boolean[totalNumChannels];
-
- channelInfos = IntStream.range(0, numGates)
- .mapToObj(gateIndex -> IntStream.range(0,
numberOfInputChannelsPerGate[gateIndex])
- .mapToObj(channelIndex -> new
InputChannelInfo(gateIndex, channelIndex)))
- .flatMap(Function.identity())
+ this.channelInfos = Arrays.stream(inputGates)
+ .flatMap(gate ->
gate.getChannels().stream().map(InputChannel::getChannelInfo))
.toArray(InputChannelInfo[]::new);
-
- threadSafeUnaligner = new ThreadSafeUnaligner(totalNumChannels,
checkNotNull(checkpointCoordinator), this);
+ hasInflightBuffers = new boolean[channelInfos.length];
+ threadSafeUnaligner = new
ThreadSafeUnaligner(channelInfos.length, checkNotNull(checkpointCoordinator),
this);
+
+ gateChannelOffsets = new int[inputGates.length];
+ int offset = 0;
+ for (final IndexedInputGate gate: inputGates) {
+ gateChannelOffsets[gate.getGateIndex()] = offset;
+ offset += gate.getNumberOfInputChannels();
+ }
Review comment:
I see similar code in `CheckpointBarrierAligner` constructor and same in
`InputProcessorUtil.createCheckpointedInputGates`.
WDYT about extracting it into something like `InputGateResolver` and passing
it to barrier handlers, input gates and whoever else need it?
Ideally, I think we should use self-sufficient IDs, but I guess this would
be too invasive change.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]