wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794663542



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +170,68 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side of 
StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int 
inputSize) {
+            MultipleInputAvailabilityHelper obj = new 
MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        /**
+         * Check the finished state of availableFuture. Reuse if possible. 
Renew {availableFuture}
+         * if previous availableFuture is already completed.
+         *
+         * @return true if availableFuture is renewed. false, reuse previous 
availableFuture.
+         */
+        public boolean checkReusableAndReset() {

Review comment:
       This reusable check maybe useless. When the inputProcessor's 
`getAvailableFuture` is called, the `availableFuture` is definitely completed. 
This reusable check shall be removed. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to