This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6f4de732da43d320167e2692d68bbb9692c62cdb Author: wangpengcheng <[email protected]> AuthorDate: Fri Jan 21 13:41:57 2022 +0800 [FLINK-25728][task] Avoid unnessesary CompletableFuture.thenRun calls on idle inputProcessor's avaiableFuture, preventing memory leaks. --- .../runtime/io/StreamMultipleInputProcessor.java | 83 +++++++++++++++++++--- 1 file changed, 75 insertions(+), 8 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java index baacf26..cb91d8e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.streaming.api.operators.InputSelection; @@ -27,8 +28,7 @@ import org.apache.flink.util.ExceptionUtils; import java.io.IOException; import java.util.concurrent.CompletableFuture; - -import static org.apache.flink.util.concurrent.FutureUtils.assertNoException; +import java.util.function.Consumer; /** Input processor for {@link MultipleInputStreamOperator}. */ @Internal @@ -37,6 +37,8 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor private final MultipleInputSelectionHandler inputSelectionHandler; private final StreamOneInputProcessor<?>[] inputProcessors; + + private final MultipleInputAvailabilityHelper availabilityHelper; /** Always try to read from the first input. */ private int lastReadInputIndex = 1; @@ -47,6 +49,14 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor StreamOneInputProcessor<?>[] inputProcessors) { this.inputSelectionHandler = inputSelectionHandler; this.inputProcessors = inputProcessors; + this.availabilityHelper = + MultipleInputAvailabilityHelper.newInstance(inputProcessors.length); + } + + @Override + public boolean isAvailable() { + return inputSelectionHandler.isAnyInputAvailable() + || inputSelectionHandler.areAllInputsFinished(); } @Override @@ -55,17 +65,19 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor || inputSelectionHandler.areAllInputsFinished()) { return AVAILABLE; } - final CompletableFuture<?> anyInputAvailable = new CompletableFuture<>(); + /* + * According to the following issue. The implementation of `CompletableFuture.anyOf` in jdk8 + * has some memory issue. This issue is fixed in jdk9. + * https://bugs.openjdk.java.net/browse/JDK-8160402 + */ + availabilityHelper.resetToUnAvailable(); for (int i = 0; i < inputProcessors.length; i++) { if (!inputSelectionHandler.isInputFinished(i) && inputSelectionHandler.isInputSelected(i)) { - assertNoException( - inputProcessors[i] - .getAvailableFuture() - .thenRun(() -> anyInputAvailable.complete(null))); + availabilityHelper.anyOf(i, inputProcessors[i].getAvailableFuture()); } } - return anyInputAvailable; + return availabilityHelper.getAvailableFuture(); } @Override @@ -157,4 +169,59 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor } } } + + /** Visible for testing only. Do not use out side of StreamMultipleInputProcessor. */ + @VisibleForTesting + public static class MultipleInputAvailabilityHelper { + private final CompletableFuture<?>[] cachedAvailableFutures; + private final Consumer[] onCompletion; + private volatile CompletableFuture<?> availableFuture; + + public CompletableFuture<?> getAvailableFuture() { + return availableFuture; + } + + public static MultipleInputAvailabilityHelper newInstance(int inputSize) { + MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize); + return obj; + } + + private MultipleInputAvailabilityHelper(int inputSize) { + this.cachedAvailableFutures = new CompletableFuture[inputSize]; + this.onCompletion = new Consumer[inputSize]; + availableFuture = new CompletableFuture<>(); + for (int i = 0; i < cachedAvailableFutures.length; i++) { + final int inputIdx = i; + onCompletion[i] = (Void) -> notifyCompletion(inputIdx); + } + } + + /** Reset availableFuture to fresh unavailable. */ + public void resetToUnAvailable() { + if (availableFuture.isDone()) { + availableFuture = new CompletableFuture<>(); + } + } + + private void notifyCompletion(int idx) { + availableFuture.complete(null); + cachedAvailableFutures[idx] = AVAILABLE; + } + + /** + * Implement `Or` logic. + * + * @param idx + * @param dep + */ + public void anyOf(final int idx, CompletableFuture<?> dep) { + if (dep == AVAILABLE || dep.isDone()) { + cachedAvailableFutures[idx] = dep; + availableFuture.complete(null); + } else if (dep != cachedAvailableFutures[idx]) { + cachedAvailableFutures[idx] = dep; + dep.thenAccept(onCompletion[idx]); + } + } + } }
