StefanRRichter commented on a change in pull request #8826: 
[FLINK-12479][operators] Integrate StreamInputProcessor(s) with mailbox
URL: https://github.com/apache/flink/pull/8826#discussion_r299006756
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
 ##########
 @@ -195,14 +221,12 @@ public void cleanup() throws Exception {
                }
        }
 
-       private int selectNextReadingInputIndex()
-               throws InterruptedException, ExecutionException, IOException {
-
-               int readingInputIndex;
-               while ((readingInputIndex = 
inputSelection.fairSelectNextIndexOutOf2(availableInputsMask, 
lastReadInputIndex)) == -1) {
-                       if (!waitForAvailableInput(inputSelection)) {
-                               return -1;
-                       }
+       private int selectNextReadingInputIndex() throws IOException {
+               updateAvailability();
 
 Review comment:
   Looks weird to me to have this check (and maybe also the one on the next 
line) in this method. I would call this as part of `processInput` instead, e.g. 
before this method is called. I know it does the same, but somehow it feels 
like the check belong into the caller method and not down here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to