rkhachatryan commented on a change in pull request #11098: [FLINK-16060][task] 
Implement working StreamMultipleInputProcessor
URL: https://github.com/apache/flink/pull/11098#discussion_r380691220
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
 ##########
 @@ -63,40 +63,57 @@ public static CheckpointedInputGate 
createCheckpointedInputGate(
        public static CheckpointedInputGate[] createCheckpointedInputGatePair(
                        AbstractInvokable toNotifyOnCheckpoint,
                        CheckpointingMode checkpointMode,
-                       InputGate inputGate1,
-                       InputGate inputGate2,
                        Configuration taskManagerConfig,
                        TaskIOMetricGroup taskIOMetricGroup,
-                       String taskName) {
+                       String taskName,
+                       InputGate ...inputGates) {
 
                int pageSize = 
ConfigurationParserUtils.getPageSize(taskManagerConfig);
 
-               BufferStorage mainBufferStorage1 = createBufferStorage(
-                       checkpointMode, pageSize, taskManagerConfig, taskName);
-               BufferStorage mainBufferStorage2 = createBufferStorage(
-                       checkpointMode, pageSize, taskManagerConfig, taskName);
-               checkState(mainBufferStorage1.getMaxBufferedBytes() == 
mainBufferStorage2.getMaxBufferedBytes());
+               BufferStorage[] mainBufferStorages = new 
BufferStorage[inputGates.length];
+               for (int i = 0; i < inputGates.length; i++) {
+                       mainBufferStorages[i] = createBufferStorage(
+                               checkpointMode, pageSize, taskManagerConfig, 
taskName);
+               }
+
+               BufferStorage[] linkedBufferStorages = new 
BufferStorage[inputGates.length];
 
-               BufferStorage linkedBufferStorage1 = new LinkedBufferStorage(
-                       mainBufferStorage1,
-                       mainBufferStorage2,
-                       mainBufferStorage1.getMaxBufferedBytes());
-               BufferStorage linkedBufferStorage2 = new LinkedBufferStorage(
-                       mainBufferStorage2,
-                       mainBufferStorage1,
-                       mainBufferStorage1.getMaxBufferedBytes());
+               for (int i = 0; i < inputGates.length; i++) {
+                       linkedBufferStorages[i] = new LinkedBufferStorage(
+                               mainBufferStorages[i],
+                               mainBufferStorages[i].getMaxBufferedBytes(),
+                               copyBufferStoragesExceptOf(i, 
mainBufferStorages));
+               }
 
                CheckpointBarrierHandler barrierHandler = 
createCheckpointBarrierHandler(
                        checkpointMode,
-                       inputGate1.getNumberOfInputChannels() + 
inputGate2.getNumberOfInputChannels(),
+                       
Arrays.stream(inputGates).mapToInt(InputGate::getNumberOfInputChannels).sum(),
                        taskName,
                        toNotifyOnCheckpoint);
                registerCheckpointMetrics(taskIOMetricGroup, barrierHandler);
 
-               return new CheckpointedInputGate[] {
-                       new CheckpointedInputGate(inputGate1, 
linkedBufferStorage1, barrierHandler),
-                       new CheckpointedInputGate(inputGate2, 
linkedBufferStorage2, barrierHandler, inputGate1.getNumberOfInputChannels())
-               };
+               CheckpointedInputGate[] checkpointedInputGates = new 
CheckpointedInputGate[inputGates.length];
+
+               int channelIndexOffset = 0;
+               for (int i = 0; i < inputGates.length; i++) {
+                       checkpointedInputGates[i] = new 
CheckpointedInputGate(inputGates[i], linkedBufferStorages[i], barrierHandler, 
channelIndexOffset);
+                       channelIndexOffset += 
inputGates[i].getNumberOfInputChannels();
+               }
+
+               return checkpointedInputGates;
+       }
+
+       private static BufferStorage[] copyBufferStoragesExceptOf(
+                       int skipStorage,
+                       BufferStorage[] bufferStorages) {
+               BufferStorage[] copy = new BufferStorage[bufferStorages.length 
- 1];
+               int copyTo = 0;
+               for (int copyFrom = 0; copyFrom < bufferStorages.length; 
copyFrom++) {
+                       if (copyFrom != skipStorage) {
+                               copy[copyTo++] = bufferStorages[copyFrom];
+                       }
+               }
+               return copy;
 
 Review comment:
   Can we improve it with two `arrayCopy` calls:
   ```
   System.arraycopy(bufferStorages, 0, copy, 0, skipStorage);
   System.arraycopy(bufferStorages, skipStorage + 1, copy, skipStorage, 
bufferStorages.length - skipStorage - 1);
   ```
   ?
   Also, variable names don't say much to me (just `src`, `dst` and `i` would 
be better).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to