zhijiangW commented on a change in pull request #9483: [FLINK-13767][task] 
Migrate isFinished method from AvailabilityListener to AsyncDataInput
URL: https://github.com/apache/flink/pull/9483#discussion_r319454145
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
 ##########
 @@ -194,23 +193,28 @@ public boolean processInput() throws Exception {
 
                int readingInputIndex = selectNextReadingInputIndex();
                if (readingInputIndex == -1) {
-                       return false;
+                       return InputStatus.NOTHING_AVAILABLE;
                }
                lastReadInputIndex = readingInputIndex;
 
                InputStatus status;
                if (readingInputIndex == 0) {
                        status = input1.emitNext(output1);
+                       firstInputStatus = status;
                } else {
                        status = input2.emitNext(output2);
+                       secondInputStatus = status;
                }
                checkFinished(status, lastReadInputIndex);
 
                if (status == InputStatus.NOTHING_AVAILABLE) {
                        
inputSelectionHandler.setUnavailableInput(readingInputIndex);
                }
 
-               return status == InputStatus.MORE_AVAILABLE;
+               if (status == InputStatus.END_OF_INPUT && secondInputStatus != 
InputStatus.END_OF_INPUT) {
 
 Review comment:
   I think I just keep the same behavior as before. Although I also felt this 
logic seem a bit tough to understand, I have not thought through whether we 
need to refactor this logic beforehand.
    
   If `status == NOTHING_AVAILABLE` and `secondInputStatus == MORE_AVAILABLE`, 
it would return `status` and in `StreamTask#performDefaultAction` it would call 
`inputProcessor.isAvailable().thenRun()`. 
   
   Before refactoring, if `status == NOTHING_AVAILABLE` which corresponds to 
the case of returning null `recordOrMark`, then in 
`StreamTask#performDefaultAction` it would still call 
`inputProcessor.isAvailable().thenRun()`. 
   
   Also in previous behavior only two inputs are both finished, then we can 
return `status==END_OF_INPUT`. If only current `status== END_OF_INPUT` then the 
returned `recordOrMark` should be null, so in `StreamTask#performDefaultAction` 
it would also enter the path of `inputProcessor.isAvailable().thenRun()`. In 
order to keep this same behavior we need to return `NOTHING_AVAILABLE` here to 
enter the same path in `StreamTask`.
   
   So this part was my main concern before, whether it is worth doing this 
refactor here. Because we add this condition `if (status == 
InputStatus.END_OF_INPUT && secondInputStatus != InputStatus.END_OF_INPUT)` to 
always check in every call for keeping the previous behavior. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to