pnowojski commented on a change in pull request #16589:
URL: https://github.com/apache/flink/pull/16589#discussion_r676437897



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -80,11 +79,19 @@ public InputStatus processInput() throws Exception {
             readingInputIndex = selectFirstReadingInputIndex();
         }
         if (readingInputIndex == InputSelection.NONE_AVAILABLE) {
-            return InputStatus.NOTHING_AVAILABLE;
+            return DataInputStatus.NOTHING_AVAILABLE;
         }
 
         lastReadInputIndex = readingInputIndex;
-        InputStatus inputStatus = 
inputProcessors[readingInputIndex].processInput();
+        DataInputStatus inputStatus = 
inputProcessors[readingInputIndex].processInput();
+        if (inputStatus == DataInputStatus.END_OF_DATA) {
+            inputSelectionHandler.updateStatus(inputStatus, readingInputIndex);
+            if (inputSelectionHandler.areAllDataInputsFinished()) {
+                return DataInputStatus.END_OF_DATA;
+            } else {
+                inputStatus = 
inputProcessors[readingInputIndex].processInput();
+            }
+        }

Review comment:
       Move to `updateStatus()`?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/MultipleInputSelectionHandler.java
##########
@@ -82,39 +87,48 @@ public InputStatus updateStatus(InputStatus inputStatus, 
int inputIndex) throws
         return calculateOverallStatus();
     }
 
-    public InputStatus calculateOverallStatus() throws IOException {
+    public DataInputStatus calculateOverallStatus() throws IOException {
         if (areAllInputsFinished()) {
-            return InputStatus.END_OF_INPUT;
+            return DataInputStatus.END_OF_INPUT;
         }
 
         if (isAnyInputAvailable()) {
-            return InputStatus.MORE_AVAILABLE;
+            return DataInputStatus.MORE_AVAILABLE;
         } else {
-            long selectedNotFinishedInputMask =
-                    inputSelection.getInputMask() & notFinishedInputsMask;
+            long selectedNotFinishedInputMask = selectedInputsMask & 
notFinishedInputsMask;
             if (selectedNotFinishedInputMask == 0) {
                 throw new IOException(
                         "Can not make a progress: all selected inputs are 
already finished");
             }
-            return InputStatus.NOTHING_AVAILABLE;
+            return DataInputStatus.NOTHING_AVAILABLE;
         }
     }
 
     void nextSelection() {
-        if (inputSelectable == null) {
-            inputSelection = InputSelection.ALL;
+        if (inputSelectable == null || areAllDataInputsFinished()) {
+            selectedInputsMask = InputSelection.ALL.getInputMask();
+        } else if (notFinishedDataInputsMask != allSelectedMask
+                && notFinishedDataInputsMask != notFinishedInputsMask) {
+            long dataFinishedButNotPartition =
+                    ((~notFinishedDataInputsMask ^ ~notFinishedInputsMask)
+                            & ~notFinishedDataInputsMask);
+            selectedInputsMask =
+                    (inputSelectable.nextSelection().getInputMask() | 
dataFinishedButNotPartition)
+                            & allSelectedMask;
         } else {

Review comment:
       Update this once per `updateStatus()`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to