This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 750311755b55ea2abf3313798453dcfefc36675a
Author: Matthias Pohl <[email protected]>
AuthorDate: Tue Aug 29 13:34:38 2023 +0200

    [FLINK-32751][streaming] Fixes bug where the enqueued requests are not 
properly cancelled
---
 .../collect/CollectSinkOperatorCoordinator.java      |  5 +++++
 .../collect/CollectSinkOperatorCoordinatorTest.java  | 20 ++++++++++++++++++++
 2 files changed, 25 insertions(+)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
index 13957b297a5..74b5de2d136 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
@@ -91,6 +91,11 @@ public class CollectSinkOperatorCoordinator
     public void close() throws Exception {
         LOG.info("Closing the CollectSinkOperatorCoordinator.");
         this.executorService.shutdownNow();
+
+        // cancelling all ongoing requests explicitly
+        ongoingRequests.forEach(ft -> ft.cancel(true));
+        ongoingRequests.clear();
+
         closeConnection();
     }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
index ac89d264ed0..f84511aa095 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
@@ -171,6 +171,26 @@ class CollectSinkOperatorCoordinatorTest {
         }
     }
 
+    @Test
+    void testCoordinatorNotConnectingToTheSinkFunctionSocket() throws 
Exception {
+        try (final TestingSinkFunction sinkFunction =
+                
TestingSinkFunction.createTestingSinkFunctionWithoutConnection()) {
+            final CollectSinkOperatorCoordinator coordinator = new 
CollectSinkOperatorCoordinator();
+            coordinator.start();
+            sinkFunction.registerSinkFunctionWith(coordinator);
+
+            final String expectedVersion = "version";
+            final CompletableFuture<CoordinationResponse> responseFuture =
+                    coordinator.handleCoordinationRequest(
+                            
createRequestForCoordinatorGeneratedResponse(expectedVersion));
+            assertThat(responseFuture).isNotDone();
+
+            // closing th coordinator before the request is sent should result 
in an empty response
+            coordinator.close();
+            assertEmptyResponseGeneratedFromCoordinator(responseFuture, 
expectedVersion);
+        }
+    }
+
     @Test
     void testReconnectAfterSinkFunctionSocketDisconnect() throws Exception {
         try (CollectSinkOperatorCoordinator coordinator = new 
CollectSinkOperatorCoordinator()) {

Reply via email to