zhijiangW commented on a change in pull request #9483: [FLINK-13767][task]
Migrate isFinished method from AvailabilityListener to AsyncDataInput
URL: https://github.com/apache/flink/pull/9483#discussion_r322300347
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
##########
@@ -158,43 +173,34 @@ public boolean isFinished() {
}
@Override
- public boolean processInput() throws Exception {
- if (!isPrepared) {
- // the preparations here are not placed in the
constructor because all work in it
- // must be executed after all operators are opened.
- prepareForProcessing();
- }
+ public InputStatus processInput() throws Exception {
+ // the preparations here are not placed in the constructor
because all work in it
+ // must be executed after all operators are opened.
+ prepareForProcessing();
int readingInputIndex = selectNextReadingInputIndex();
if (readingInputIndex == -1) {
- return false;
+ return InputStatus.NOTHING_AVAILABLE;
}
lastReadInputIndex = readingInputIndex;
- StreamElement recordOrMark;
+ InputStatus status, anotherStatus;
if (readingInputIndex == 0) {
- recordOrMark = input1.pollNextNullable();
- if (recordOrMark != null) {
- processElement1(recordOrMark,
input1.getLastChannel());
- }
- checkFinished(input1, lastReadInputIndex);
+ status = input1.emitNext(output1);
+ firstInputStatus = status;
+ anotherStatus = secondInputStatus;
} else {
- recordOrMark = input2.pollNextNullable();
- if (recordOrMark != null) {
- processElement2(recordOrMark,
input2.getLastChannel());
- }
- checkFinished(input2, lastReadInputIndex);
- }
-
- if (recordOrMark == null) {
-
inputSelectionHandler.setUnavailableInput(readingInputIndex);
+ status = input2.emitNext(output2);
+ secondInputStatus = status;
+ anotherStatus = firstInputStatus;
}
+ checkFinished(status, lastReadInputIndex);
- return recordOrMark != null;
+ return status == InputStatus.END_OF_INPUT ? anotherStatus :
status;
Review comment:
Not quite confirm the performance optimization issue.
From the precise aspect, my previous way was trying to maintain the old
behavior which only judges the current input status except for the end status.
I think all these ways would bring bad cases.
E.g. if current status is equal to `MORE_AVAILABLE` but the next selection
is the other input which is actually `NOTHING_AVAILABLE`, then after calling
`processInput` again it would actually return -1 while
`selectNextReadingInputIndex()` to waste processing time.
Before introducing `InputStatus` it could also return precise status if
maintaining both last input states as the way I did (`firstInputStatus`,
`secondInputStatus`) in this PR. If we want to do that we need also consider
three factors together (`firstInputStatus`, `secondInputStatus` and next
selected input) to determine the final precise status for `StreamTask`.
If doing so, the above `selectNextReadingInputIndex` could only return -1
for the first calling `processInput`, and we could also refactor that part
together.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services