This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 750311755b55ea2abf3313798453dcfefc36675a Author: Matthias Pohl <[email protected]> AuthorDate: Tue Aug 29 13:34:38 2023 +0200 [FLINK-32751][streaming] Fixes bug where the enqueued requests are not properly cancelled --- .../collect/CollectSinkOperatorCoordinator.java | 5 +++++ .../collect/CollectSinkOperatorCoordinatorTest.java | 20 ++++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java index 13957b297a5..74b5de2d136 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java @@ -91,6 +91,11 @@ public class CollectSinkOperatorCoordinator public void close() throws Exception { LOG.info("Closing the CollectSinkOperatorCoordinator."); this.executorService.shutdownNow(); + + // cancelling all ongoing requests explicitly + ongoingRequests.forEach(ft -> ft.cancel(true)); + ongoingRequests.clear(); + closeConnection(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java index ac89d264ed0..f84511aa095 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java @@ -171,6 +171,26 @@ class CollectSinkOperatorCoordinatorTest { } } + @Test + void testCoordinatorNotConnectingToTheSinkFunctionSocket() throws Exception { + try (final TestingSinkFunction sinkFunction = + TestingSinkFunction.createTestingSinkFunctionWithoutConnection()) { + final CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator(); + coordinator.start(); + sinkFunction.registerSinkFunctionWith(coordinator); + + final String expectedVersion = "version"; + final CompletableFuture<CoordinationResponse> responseFuture = + coordinator.handleCoordinationRequest( + createRequestForCoordinatorGeneratedResponse(expectedVersion)); + assertThat(responseFuture).isNotDone(); + + // closing th coordinator before the request is sent should result in an empty response + coordinator.close(); + assertEmptyResponseGeneratedFromCoordinator(responseFuture, expectedVersion); + } + } + @Test void testReconnectAfterSinkFunctionSocketDisconnect() throws Exception { try (CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator()) {
