1u0 commented on a change in pull request #9483: [FLINK-13767][task] Migrate isFinished method from AvailabilityListener to AsyncDataInput URL: https://github.com/apache/flink/pull/9483#discussion_r321783316
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java ########## @@ -158,43 +173,34 @@ public boolean isFinished() { } @Override - public boolean processInput() throws Exception { - if (!isPrepared) { - // the preparations here are not placed in the constructor because all work in it - // must be executed after all operators are opened. - prepareForProcessing(); - } + public InputStatus processInput() throws Exception { + // the preparations here are not placed in the constructor because all work in it + // must be executed after all operators are opened. + prepareForProcessing(); int readingInputIndex = selectNextReadingInputIndex(); if (readingInputIndex == -1) { - return false; + return InputStatus.NOTHING_AVAILABLE; } lastReadInputIndex = readingInputIndex; - StreamElement recordOrMark; + InputStatus status, anotherStatus; if (readingInputIndex == 0) { - recordOrMark = input1.pollNextNullable(); - if (recordOrMark != null) { - processElement1(recordOrMark, input1.getLastChannel()); - } - checkFinished(input1, lastReadInputIndex); + status = input1.emitNext(output1); + firstInputStatus = status; + anotherStatus = secondInputStatus; } else { - recordOrMark = input2.pollNextNullable(); - if (recordOrMark != null) { - processElement2(recordOrMark, input2.getLastChannel()); - } - checkFinished(input2, lastReadInputIndex); - } - - if (recordOrMark == null) { - inputSelectionHandler.setUnavailableInput(readingInputIndex); + status = input2.emitNext(output2); + secondInputStatus = status; + anotherStatus = firstInputStatus; } + checkFinished(status, lastReadInputIndex); - return recordOrMark != null; + return status == InputStatus.END_OF_INPUT ? anotherStatus : status; Review comment: I think more proper way (and as "performance optimization") you can rewrite the return value as: ```java if (firstInputStatus == InputStatus.MORE_AVAILABLE || secondInputStatus == InputStatus.MORE_AVAILABLE) { return InputStatus.MORE_AVAILABLE; } else if (firstInputStatus == InputStatus.END_OF_INPUT && secondInputStatus == InputStatus.END_OF_INPUT) { return InputStatus.END_OF_INPUT; } else { return InputStatus.NOTHING_AVAILABLE; } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services