pnowojski commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794476580
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
}
}
}
+
+ /** Visible for testing only. Do not use out side
StreamMultipleInputProcessor. */
+ @VisibleForTesting
+ public static class MultipleInputAvailabilityHelper {
+ private final CompletableFuture<?>[] cachedAvailableFutures;
+ private final Consumer[] onCompletion;
+ private CompletableFuture<?> availableFuture;
+
+ public CompletableFuture<?> getAvailableFuture() {
+ return availableFuture;
+ }
+
+ public static MultipleInputAvailabilityHelper newInstance(int
inputSize) {
+ MultipleInputAvailabilityHelper obj = new
MultipleInputAvailabilityHelper(inputSize);
+ return obj;
+ }
+
+ private MultipleInputAvailabilityHelper(int inputSize) {
+ this.cachedAvailableFutures = new CompletableFuture[inputSize];
+ this.onCompletion = new Consumer[inputSize];
+ }
+
+ @VisibleForTesting
+ public void init() {
+ for (int i = 0; i < cachedAvailableFutures.length; i++) {
+ final int inputIdx = i;
+ onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+ }
+ }
+
+ public boolean isInvalid() {
+ return availableFuture == null || availableFuture.isDone();
+ }
+
+ public void resetToUnavailable() {
+ availableFuture = new CompletableFuture<>();
+ }
+
+ private void notifyCompletion(int idx) {
+ if (availableFuture != null && !availableFuture.isDone()) {
+ availableFuture.complete(null);
+ }
+ cachedAvailableFutures[idx] = AVAILABLE;
+ }
Review comment:
> Even with volatile, the future completion can still happens inbetween
the maybeReset() call (let's say, after the isDone() check, and before the set
operation) and the completion callback will see the obsolete, old, already
completed anyAvailable future, and try to complete it.
> It's no difference. The AtomicReference is not preventing this.
AtomicReference's set and get are plain method on plain object.
As I was trying to explain above in the point 2.
> Take a closer look on the order of execution in my version methods
maybeReset() and registerFuture()
If input becomes available just after `anyAvailable.get().isDone()` check,
but before `anyAvailable.set(new CompletableFuture<>())`, the combined/returned
future will be available anyway. Such input would return `true` for
`needsToBeRegistered()` since its future would be completed, `registerFuture()`
would be executed, which would immediately complete the combined future.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]