This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9365510e8b0f5baaa25311bb44dd5c03d6773738 Author: Piotr Nowojski <[email protected]> AuthorDate: Mon Jan 31 17:49:07 2022 +0100 [FLINK-25827][task] Fix potential memory leak in SourceOperator when using CompletableFuture.anyOf --- .../streaming/api/operators/SourceOperator.java | 27 ++++++++++++---------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index eb82768..0866a52 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -49,6 +49,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks; import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; import org.apache.flink.streaming.runtime.io.DataInputStatus; +import org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper; import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; @@ -487,24 +488,26 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr private static class SourceOperatorAvailabilityHelper { private final CompletableFuture<Void> forcedStopFuture = new CompletableFuture<>(); - private CompletableFuture<Void> currentReaderFuture; - private CompletableFuture<?> currentCombinedFuture; + private final MultipleFuturesAvailabilityHelper availabilityHelper; + + private SourceOperatorAvailabilityHelper() { + availabilityHelper = new MultipleFuturesAvailabilityHelper(2); + availabilityHelper.anyOf(0, forcedStopFuture); + } public CompletableFuture<?> update(CompletableFuture<Void> sourceReaderFuture) { - if (sourceReaderFuture == AvailabilityProvider.AVAILABLE) { - return sourceReaderFuture; - } else if (sourceReaderFuture == currentReaderFuture) { - return currentCombinedFuture; - } else { - currentReaderFuture = sourceReaderFuture; - currentCombinedFuture = - CompletableFuture.anyOf(forcedStopFuture, sourceReaderFuture); - return currentCombinedFuture; + if (sourceReaderFuture == AvailabilityProvider.AVAILABLE + || sourceReaderFuture.isDone()) { + return AvailabilityProvider.AVAILABLE; } + availabilityHelper.resetToUnAvailable(); + availabilityHelper.anyOf(0, forcedStopFuture); + availabilityHelper.anyOf(1, sourceReaderFuture); + return availabilityHelper.getAvailableFuture(); } public void forceStop() { - this.forcedStopFuture.complete(null); + forcedStopFuture.complete(null); } } }
