This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 761a4623ddac2b24a96d00fef33c32bcea29c92f Author: Piotr Nowojski <[email protected]> AuthorDate: Mon Jan 31 13:05:33 2022 +0100 [FLINK-25728][task] Simplify MultipleInputAvailabilityHelper --- .../runtime/io/StreamMultipleInputProcessor.java | 91 ++++++++++------------ 1 file changed, 40 insertions(+), 51 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java index cb91d8e..ba7e46e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.streaming.api.operators.InputSelection; @@ -28,7 +27,8 @@ import org.apache.flink.util.ExceptionUtils; import java.io.IOException; import java.util.concurrent.CompletableFuture; -import java.util.function.Consumer; + +import static org.apache.flink.util.concurrent.FutureUtils.assertNoException; /** Input processor for {@link MultipleInputStreamOperator}. */ @Internal @@ -49,14 +49,7 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor StreamOneInputProcessor<?>[] inputProcessors) { this.inputSelectionHandler = inputSelectionHandler; this.inputProcessors = inputProcessors; - this.availabilityHelper = - MultipleInputAvailabilityHelper.newInstance(inputProcessors.length); - } - - @Override - public boolean isAvailable() { - return inputSelectionHandler.isAnyInputAvailable() - || inputSelectionHandler.areAllInputsFinished(); + this.availabilityHelper = new MultipleInputAvailabilityHelper(inputProcessors.length); } @Override @@ -65,11 +58,7 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor || inputSelectionHandler.areAllInputsFinished()) { return AVAILABLE; } - /* - * According to the following issue. The implementation of `CompletableFuture.anyOf` in jdk8 - * has some memory issue. This issue is fixed in jdk9. - * https://bugs.openjdk.java.net/browse/JDK-8160402 - */ + availabilityHelper.resetToUnAvailable(); for (int i = 0; i < inputProcessors.length; i++) { if (!inputSelectionHandler.isInputFinished(i) @@ -170,57 +159,57 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor } } - /** Visible for testing only. Do not use out side of StreamMultipleInputProcessor. */ - @VisibleForTesting - public static class MultipleInputAvailabilityHelper { - private final CompletableFuture<?>[] cachedAvailableFutures; - private final Consumer[] onCompletion; - private volatile CompletableFuture<?> availableFuture; - + /** + * This class is semi-thread safe. Only method {@link #notifyCompletion()} is allowed to be + * executed from an outside of the task thread. + * + * <p>It solves a problem of a potential memory leak as described in FLINK-25728. In short we + * have to ensure, that if there is one input (future) that rarely (or never) completes, that + * such future would not prevent previously returned combined futures (like {@link + * CompletableFuture#anyOf(CompletableFuture[])} from being garbage collected. Additionally, we + * don't want to accumulate more and more completion stages on such rarely completed future, so + * we are registering {@link CompletableFuture#thenRun(Runnable)} only if it has not already + * been done. + * + * <p>Note {@link #resetToUnAvailable()} doesn't de register previously registered futures. If + * future was registered in the past, but for whatever reason now it is not, such future can + * still complete the newly created future. + * + * <p>It might be no longer needed after upgrading to JDK9 + * (https://bugs.openjdk.java.net/browse/JDK-8160402). + */ + private static class MultipleInputAvailabilityHelper { + private final CompletableFuture<?>[] futuresToCombine; + + private volatile CompletableFuture<?> availableFuture = new CompletableFuture<>(); + + public MultipleInputAvailabilityHelper(int inputSize) { + futuresToCombine = new CompletableFuture[inputSize]; + } + + /** @return combined future using anyOf logic */ public CompletableFuture<?> getAvailableFuture() { return availableFuture; } - public static MultipleInputAvailabilityHelper newInstance(int inputSize) { - MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize); - return obj; - } - - private MultipleInputAvailabilityHelper(int inputSize) { - this.cachedAvailableFutures = new CompletableFuture[inputSize]; - this.onCompletion = new Consumer[inputSize]; - availableFuture = new CompletableFuture<>(); - for (int i = 0; i < cachedAvailableFutures.length; i++) { - final int inputIdx = i; - onCompletion[i] = (Void) -> notifyCompletion(inputIdx); - } - } - - /** Reset availableFuture to fresh unavailable. */ public void resetToUnAvailable() { if (availableFuture.isDone()) { availableFuture = new CompletableFuture<>(); } } - private void notifyCompletion(int idx) { + private void notifyCompletion() { availableFuture.complete(null); - cachedAvailableFutures[idx] = AVAILABLE; } /** - * Implement `Or` logic. - * - * @param idx - * @param dep + * Combine {@code availabilityFuture} using anyOf logic with other previously registered + * futures. */ - public void anyOf(final int idx, CompletableFuture<?> dep) { - if (dep == AVAILABLE || dep.isDone()) { - cachedAvailableFutures[idx] = dep; - availableFuture.complete(null); - } else if (dep != cachedAvailableFutures[idx]) { - cachedAvailableFutures[idx] = dep; - dep.thenAccept(onCompletion[idx]); + public void anyOf(final int idx, CompletableFuture<?> availabilityFuture) { + if (futuresToCombine[idx] == null || futuresToCombine[idx].isDone()) { + futuresToCombine[idx] = availabilityFuture; + assertNoException(availabilityFuture.thenRun(this::notifyCompletion)); } } }
