This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 761a4623ddac2b24a96d00fef33c32bcea29c92f
Author: Piotr Nowojski <[email protected]>
AuthorDate: Mon Jan 31 13:05:33 2022 +0100

    [FLINK-25728][task] Simplify MultipleInputAvailabilityHelper
---
 .../runtime/io/StreamMultipleInputProcessor.java   | 91 ++++++++++------------
 1 file changed, 40 insertions(+), 51 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
index cb91d8e..ba7e46e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.streaming.api.operators.InputSelection;
@@ -28,7 +27,8 @@ import org.apache.flink.util.ExceptionUtils;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
+
+import static org.apache.flink.util.concurrent.FutureUtils.assertNoException;
 
 /** Input processor for {@link MultipleInputStreamOperator}. */
 @Internal
@@ -49,14 +49,7 @@ public final class StreamMultipleInputProcessor implements 
StreamInputProcessor
             StreamOneInputProcessor<?>[] inputProcessors) {
         this.inputSelectionHandler = inputSelectionHandler;
         this.inputProcessors = inputProcessors;
-        this.availabilityHelper =
-                
MultipleInputAvailabilityHelper.newInstance(inputProcessors.length);
-    }
-
-    @Override
-    public boolean isAvailable() {
-        return inputSelectionHandler.isAnyInputAvailable()
-                || inputSelectionHandler.areAllInputsFinished();
+        this.availabilityHelper = new 
MultipleInputAvailabilityHelper(inputProcessors.length);
     }
 
     @Override
@@ -65,11 +58,7 @@ public final class StreamMultipleInputProcessor implements 
StreamInputProcessor
                 || inputSelectionHandler.areAllInputsFinished()) {
             return AVAILABLE;
         }
-        /*
-         * According to the following issue. The implementation of 
`CompletableFuture.anyOf` in jdk8
-         * has some memory issue. This issue is fixed in jdk9.
-         * https://bugs.openjdk.java.net/browse/JDK-8160402
-         */
+
         availabilityHelper.resetToUnAvailable();
         for (int i = 0; i < inputProcessors.length; i++) {
             if (!inputSelectionHandler.isInputFinished(i)
@@ -170,57 +159,57 @@ public final class StreamMultipleInputProcessor 
implements StreamInputProcessor
         }
     }
 
-    /** Visible for testing only. Do not use out side of 
StreamMultipleInputProcessor. */
-    @VisibleForTesting
-    public static class MultipleInputAvailabilityHelper {
-        private final CompletableFuture<?>[] cachedAvailableFutures;
-        private final Consumer[] onCompletion;
-        private volatile CompletableFuture<?> availableFuture;
-
+    /**
+     * This class is semi-thread safe. Only method {@link #notifyCompletion()} 
is allowed to be
+     * executed from an outside of the task thread.
+     *
+     * <p>It solves a problem of a potential memory leak as described in 
FLINK-25728. In short we
+     * have to ensure, that if there is one input (future) that rarely (or 
never) completes, that
+     * such future would not prevent previously returned combined futures 
(like {@link
+     * CompletableFuture#anyOf(CompletableFuture[])} from being garbage 
collected. Additionally, we
+     * don't want to accumulate more and more completion stages on such rarely 
completed future, so
+     * we are registering {@link CompletableFuture#thenRun(Runnable)} only if 
it has not already
+     * been done.
+     *
+     * <p>Note {@link #resetToUnAvailable()} doesn't de register previously 
registered futures. If
+     * future was registered in the past, but for whatever reason now it is 
not, such future can
+     * still complete the newly created future.
+     *
+     * <p>It might be no longer needed after upgrading to JDK9
+     * (https://bugs.openjdk.java.net/browse/JDK-8160402).
+     */
+    private static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] futuresToCombine;
+
+        private volatile CompletableFuture<?> availableFuture = new 
CompletableFuture<>();
+
+        public MultipleInputAvailabilityHelper(int inputSize) {
+            futuresToCombine = new CompletableFuture[inputSize];
+        }
+
+        /** @return combined future using anyOf logic */
         public CompletableFuture<?> getAvailableFuture() {
             return availableFuture;
         }
 
-        public static MultipleInputAvailabilityHelper newInstance(int 
inputSize) {
-            MultipleInputAvailabilityHelper obj = new 
MultipleInputAvailabilityHelper(inputSize);
-            return obj;
-        }
-
-        private MultipleInputAvailabilityHelper(int inputSize) {
-            this.cachedAvailableFutures = new CompletableFuture[inputSize];
-            this.onCompletion = new Consumer[inputSize];
-            availableFuture = new CompletableFuture<>();
-            for (int i = 0; i < cachedAvailableFutures.length; i++) {
-                final int inputIdx = i;
-                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
-            }
-        }
-
-        /** Reset availableFuture to fresh unavailable. */
         public void resetToUnAvailable() {
             if (availableFuture.isDone()) {
                 availableFuture = new CompletableFuture<>();
             }
         }
 
-        private void notifyCompletion(int idx) {
+        private void notifyCompletion() {
             availableFuture.complete(null);
-            cachedAvailableFutures[idx] = AVAILABLE;
         }
 
         /**
-         * Implement `Or` logic.
-         *
-         * @param idx
-         * @param dep
+         * Combine {@code availabilityFuture} using anyOf logic with other 
previously registered
+         * futures.
          */
-        public void anyOf(final int idx, CompletableFuture<?> dep) {
-            if (dep == AVAILABLE || dep.isDone()) {
-                cachedAvailableFutures[idx] = dep;
-                availableFuture.complete(null);
-            } else if (dep != cachedAvailableFutures[idx]) {
-                cachedAvailableFutures[idx] = dep;
-                dep.thenAccept(onCompletion[idx]);
+        public void anyOf(final int idx, CompletableFuture<?> 
availabilityFuture) {
+            if (futuresToCombine[idx] == null || 
futuresToCombine[idx].isDone()) {
+                futuresToCombine[idx] = availabilityFuture;
+                
assertNoException(availabilityFuture.thenRun(this::notifyCompletion));
             }
         }
     }

Reply via email to