pnowojski commented on a change in pull request #11098: [FLINK-16060][task] 
Implement working StreamMultipleInputProcessor
URL: https://github.com/apache/flink/pull/11098#discussion_r382681408
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InputSelection.java
 ##########
 @@ -110,8 +101,39 @@ public int fairSelectNextIndexOutOf2(int 
availableInputsMask, int lastReadInputI
                throw new UnsupportedOperationException("Only two inputs are 
supported.");
        }
 
-       private static boolean isALLMaskOf2(long inputMask) {
-               return (3 & inputMask) == 3;
+       /**
+        * Fairly select one of the available inputs for reading.
+        *
+        * @param availableInputsMask The mask of all available inputs.
+        * @param lastReadInputIndex The index of last read input.
+        * @return the index of the input for reading or -1, and -1 indicates 
no input is selected (
+        *         {@code inputMask} is empty or the inputs in {@code 
inputMask} are unavailable).
+        */
+       public int fairSelectNextIndex(int availableInputsMask, int 
lastReadInputIndex) {
+               int selectionMask = (int) inputMask;
+               int combineMask = availableInputsMask & selectionMask;
+
+               if (combineMask == 0) {
+                       return -1;
+               }
+
+               int nextReadInputIndex = fairSelectFromRightBits(combineMask, 
lastReadInputIndex + 1);
+               if (nextReadInputIndex >= 0) {
+                       return nextReadInputIndex;
+               }
+               return fairSelectFromRightBits(combineMask, 0);
+       }
+
+       private int fairSelectFromRightBits(int combineMask, int 
nextReadInputIndex) {
+               int rightBits = combineMask >> nextReadInputIndex;
+               while (rightBits > 0) {
+                       if (rightBits % 2 == 1) {
 
 Review comment:
   ~For me `% 2` is easier to understand, as `%` operator is used much more 
frequently compared to `&`. And there is zero performance difference between 
those two (both will be optimised to the same machine code)~
   
   scratch that, `& 1` behaves somehow better for negative inputs. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to