zhijiangW commented on a change in pull request #9483: [FLINK-13767][task]
Migrate isFinished method from AvailabilityListener to AsyncDataInput
URL: https://github.com/apache/flink/pull/9483#discussion_r319454145
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
##########
@@ -194,23 +193,28 @@ public boolean processInput() throws Exception {
int readingInputIndex = selectNextReadingInputIndex();
if (readingInputIndex == -1) {
- return false;
+ return InputStatus.NOTHING_AVAILABLE;
}
lastReadInputIndex = readingInputIndex;
InputStatus status;
if (readingInputIndex == 0) {
status = input1.emitNext(output1);
+ firstInputStatus = status;
} else {
status = input2.emitNext(output2);
+ secondInputStatus = status;
}
checkFinished(status, lastReadInputIndex);
if (status == InputStatus.NOTHING_AVAILABLE) {
inputSelectionHandler.setUnavailableInput(readingInputIndex);
}
- return status == InputStatus.MORE_AVAILABLE;
+ if (status == InputStatus.END_OF_INPUT && secondInputStatus !=
InputStatus.END_OF_INPUT) {
Review comment:
I think I just keep the same behavior as before. Although I also felt this
logic seem a bit tough to understand, I have not thought through whether we
need to refactor this logic beforehand.
If `status == NOTHING_AVAILABLE` and `secondInputStatus == MORE_AVAILABLE`,
it would return `status` and in `StreamTask#performDefaultAction` it would call
`inputProcessor.isAvailable().thenRun()`.
Before refactoring, if `status == NOTHING_AVAILABLE` which corresponds to
the case of returning null `recordOrMark`, then in
`StreamTask#performDefaultAction` it would still call
`inputProcessor.isAvailable().thenRun()`.
Also in previous behavior only two inputs are both finished, then we can
return `status==END_OF_INPUT`. If only current `status== END_OF_INPUT` then the
returned `recordOrMark` should be null, so in `StreamTask#performDefaultAction`
it would also enter the path of `inputProcessor.isAvailable().thenRun()`. In
order to keep this same behavior we need to return `NOTHING_AVAILABLE` here to
enter the same path in `StreamTask`.
So this part was my main concern before, whether it is worth doing this
refactor here. Because we add this condition `if (status ==
InputStatus.END_OF_INPUT && secondInputStatus != InputStatus.END_OF_INPUT)` to
always check in every call for keeping the previous behavior.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services