zhijiangW commented on a change in pull request #9483: [FLINK-13767][task]
Refactor StreamInputProcessor#processInput based on InputStatus
URL: https://github.com/apache/flink/pull/9483#discussion_r325477987
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
##########
@@ -184,10 +184,6 @@ public boolean processInput() throws Exception {
}
checkFinished(status, lastReadInputIndex);
- if (status != InputStatus.MORE_AVAILABLE) {
Review comment:
Let me summary the changes a bit. There are mainly three changes:
1. Migrate `inputSelectionHandler.setUnavailableInput` to integrate with
`selectNextReadingInputIndex#updateAvailability`. I think it has no behavior
different with before. Just to put similar logics together for better tracing.
2. Return status instead of boolean for `StreamInputProcessor#processInput`.
It is for replacing the `isFinished` method and keeping the consistent form
with `emitNext` return type. It also has no behavior change, and only
`StreamTask#processInput` adds two additional conditions check instead of
previous `while` way. But we can also use `while` way or add the hot path to
avoid performance concern.
```
if (status == InputStatus.MORE_AVAILABLE) {
return;
}
```
3. The value returned by `StreamTwoInputProcessor#processInput` is different
from before which might cause different behaviors. In the past it might return
invalid status after current call, and then in the next call it might return -1
while `selectNextReadingInputIndex()` to trigger `isAvailable()` access. But
now it would always return valid status by judging the next selected index with
its status. So if it is not available for next selected input, it would trigger
`isAvailable()` access in current call directly, no need to trigger it in next
call. From this point, my previous thought was that it might save some
unnecessary cost from mailbox schedule aspect.
Anyway, it is hard to verify the above points because of unstable benchmark.
So we can only analyze the possible impacts in theory.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services