1u0 commented on a change in pull request #9483: [FLINK-13767][task] Migrate 
isFinished method from AvailabilityListener to AsyncDataInput
URL: https://github.com/apache/flink/pull/9483#discussion_r321783316
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 ##########
 @@ -158,43 +173,34 @@ public boolean isFinished() {
        }
 
        @Override
-       public boolean processInput() throws Exception {
-               if (!isPrepared) {
-                       // the preparations here are not placed in the 
constructor because all work in it
-                       // must be executed after all operators are opened.
-                       prepareForProcessing();
-               }
+       public InputStatus processInput() throws Exception {
+               // the preparations here are not placed in the constructor 
because all work in it
+               // must be executed after all operators are opened.
+               prepareForProcessing();
 
                int readingInputIndex = selectNextReadingInputIndex();
                if (readingInputIndex == -1) {
-                       return false;
+                       return InputStatus.NOTHING_AVAILABLE;
                }
                lastReadInputIndex = readingInputIndex;
 
-               StreamElement recordOrMark;
+               InputStatus status, anotherStatus;
                if (readingInputIndex == 0) {
-                       recordOrMark = input1.pollNextNullable();
-                       if (recordOrMark != null) {
-                               processElement1(recordOrMark, 
input1.getLastChannel());
-                       }
-                       checkFinished(input1, lastReadInputIndex);
+                       status = input1.emitNext(output1);
+                       firstInputStatus = status;
+                       anotherStatus = secondInputStatus;
                } else {
-                       recordOrMark = input2.pollNextNullable();
-                       if (recordOrMark != null) {
-                               processElement2(recordOrMark, 
input2.getLastChannel());
-                       }
-                       checkFinished(input2, lastReadInputIndex);
-               }
-
-               if (recordOrMark == null) {
-                       
inputSelectionHandler.setUnavailableInput(readingInputIndex);
+                       status = input2.emitNext(output2);
+                       secondInputStatus = status;
+                       anotherStatus = firstInputStatus;
                }
+               checkFinished(status, lastReadInputIndex);
 
-               return recordOrMark != null;
+               return status == InputStatus.END_OF_INPUT ? anotherStatus : 
status;
 
 Review comment:
   I think more proper way (and as "performance optimization") you can rewrite 
the return value as:
   ```java
   if (firstInputStatus == InputStatus.MORE_AVAILABLE || secondInputStatus == 
InputStatus.MORE_AVAILABLE) {
       return InputStatus.MORE_AVAILABLE;
   } else if (firstInputStatus == InputStatus.END_OF_INPUT && secondInputStatus 
== InputStatus.END_OF_INPUT) {
       return InputStatus.END_OF_INPUT;
   } else {
       return InputStatus.NOTHING_AVAILABLE;
   }

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to