wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794387909



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side 
StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int 
inputSize) {
+            MultipleInputAvailabilityHelper obj = new 
MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       > So I don't see any race condition in my version. Yours works quite 
similar after all, but your version's lack of AtomicReference creates an 
opportunity, for example in point 3., that if input becomes available, it will 
attempt to complete wrong, obsolete, old, already completed future.
   
   The race condition exists in both of our fix. The `AtomicReference` without 
`CAS` operations is no difference with plain field reference. We both relies on 
redundant completion check to make sure that we are not missing any completion 
notification.
   
   I'm quite sure, replace the `AtomicReference` with plain field will make no 
difference.
   But this it not saying that your fix is not working. But only some opinion 
to the `AtomicReference` usage.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to