XComp commented on code in PR #23296:
URL: https://github.com/apache/flink/pull/23296#discussion_r1308278198


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java:
##########
@@ -83,7 +104,10 @@ public void start() throws Exception {
     @Override
     public void close() throws Exception {
         LOG.info("Closing the CollectSinkOperatorCoordinator.");
+        running = false;
         this.executorService.shutdownNow();
+        ongoingRequests.forEach(ft -> ft.cancel(true));

Review Comment:
   Yeah, you're right. I don't know why but I remember that I assumed in the 
[previous PR #23180](https://github.com/apache/flink/pull/23180) that shutdown 
would trigger a cancellation of the queued tasks. But that's not the case. The 
guarding of the `SocketConnection` and the `running` are obsolete: The running 
tasks are handled by `shutdownNow` and the queued tasks are cancelled before 
executing them. Therefore, no concurrency happens when resetting the 
`SocketConnection`.
   
   That also explains the more frequent test failures: The previous `shutdown` 
call tried to handle the ongoing requests sequentially. The `shutdownNow` seem 
to have cancelled the ongoing task but queued tasks were executed afterwards 
which lead to the re-instantiation of the `SocketConnection`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to