This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9365510e8b0f5baaa25311bb44dd5c03d6773738
Author: Piotr Nowojski <[email protected]>
AuthorDate: Mon Jan 31 17:49:07 2022 +0100

    [FLINK-25827][task] Fix potential memory leak in SourceOperator when using 
CompletableFuture.anyOf
---
 .../streaming/api/operators/SourceOperator.java    | 27 ++++++++++++----------
 1 file changed, 15 insertions(+), 12 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index eb82768..0866a52 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -49,6 +49,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks;
 import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
 import org.apache.flink.streaming.runtime.io.DataInputStatus;
+import org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper;
 import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -487,24 +488,26 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
 
     private static class SourceOperatorAvailabilityHelper {
         private final CompletableFuture<Void> forcedStopFuture = new 
CompletableFuture<>();
-        private CompletableFuture<Void> currentReaderFuture;
-        private CompletableFuture<?> currentCombinedFuture;
+        private final MultipleFuturesAvailabilityHelper availabilityHelper;
+
+        private SourceOperatorAvailabilityHelper() {
+            availabilityHelper = new MultipleFuturesAvailabilityHelper(2);
+            availabilityHelper.anyOf(0, forcedStopFuture);
+        }
 
         public CompletableFuture<?> update(CompletableFuture<Void> 
sourceReaderFuture) {
-            if (sourceReaderFuture == AvailabilityProvider.AVAILABLE) {
-                return sourceReaderFuture;
-            } else if (sourceReaderFuture == currentReaderFuture) {
-                return currentCombinedFuture;
-            } else {
-                currentReaderFuture = sourceReaderFuture;
-                currentCombinedFuture =
-                        CompletableFuture.anyOf(forcedStopFuture, 
sourceReaderFuture);
-                return currentCombinedFuture;
+            if (sourceReaderFuture == AvailabilityProvider.AVAILABLE
+                    || sourceReaderFuture.isDone()) {
+                return AvailabilityProvider.AVAILABLE;
             }
+            availabilityHelper.resetToUnAvailable();
+            availabilityHelper.anyOf(0, forcedStopFuture);
+            availabilityHelper.anyOf(1, sourceReaderFuture);
+            return availabilityHelper.getAvailableFuture();
         }
 
         public void forceStop() {
-            this.forcedStopFuture.complete(null);
+            forcedStopFuture.complete(null);
         }
     }
 }

Reply via email to