This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new ee6fbd0c4a9 [FLINK-32751][runtime] Refactors close handling to 
CollectSinkOperatorCoordinator (#23238)
ee6fbd0c4a9 is described below

commit ee6fbd0c4a9e3ee1b30872389829a284b1c6e215
Author: Matthias Pohl <[email protected]>
AuthorDate: Mon Aug 21 09:18:56 2023 +0200

    [FLINK-32751][runtime] Refactors close handling to 
CollectSinkOperatorCoordinator (#23238)
    
    shutdownNow() is added to immediately stop any already submitted requests. 
That makes
    the close logic more explicit rather than relying on the Runnables to 
finish implicitly
    through the connection loss.
    
    Any ongoing requests are now also cancelled. This allows us to log specific 
cases of unexpected
    errors properly.
---
 .../collect/CollectSinkOperatorCoordinator.java    | 140 ++++++++++++---------
 1 file changed, 82 insertions(+), 58 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
index e926d625e0a..806ed180496 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
@@ -25,8 +25,10 @@ import 
org.apache.flink.runtime.operators.coordination.CoordinationRequestHandle
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +43,10 @@ import java.io.ObjectOutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -63,6 +68,9 @@ public class CollectSinkOperatorCoordinator
     private DataInputViewStreamWrapper inStream;
     private DataOutputViewStreamWrapper outStream;
 
+    private final Set<CompletableFuture<CoordinationResponse>> ongoingRequests 
=
+            ConcurrentHashMap.newKeySet();
+
     private ExecutorService executorService;
 
     public CollectSinkOperatorCoordinator(int socketTimeout) {
@@ -79,8 +87,9 @@ public class CollectSinkOperatorCoordinator
 
     @Override
     public void close() throws Exception {
+        LOG.info("Closing the CollectSinkOperatorCoordinator.");
+        this.executorService.shutdownNow();
         closeConnection();
-        this.executorService.shutdown();
     }
 
     @Override
@@ -101,75 +110,82 @@ public class CollectSinkOperatorCoordinator
                 "Coordination request must be a CollectCoordinationRequest");
 
         CollectCoordinationRequest collectRequest = 
(CollectCoordinationRequest) request;
-        CompletableFuture<CoordinationResponse> responseFuture = new 
CompletableFuture<>();
-
         if (address == null) {
-            completeWithEmptyResponse(collectRequest, responseFuture);
-            return responseFuture;
+            return 
CompletableFuture.completedFuture(createEmptyResponse(collectRequest));
         }
 
-        executorService.submit(() -> handleRequestImpl(collectRequest, 
responseFuture, address));
-        return responseFuture;
+        final CompletableFuture<CoordinationResponse> responseFuture =
+                FutureUtils.supplyAsync(
+                        () -> handleRequestImpl(collectRequest, address), 
executorService);
+
+        ongoingRequests.add(responseFuture);
+        return responseFuture.handle(
+                (response, error) -> {
+                    ongoingRequests.remove(responseFuture);
+
+                    if (response != null) {
+                        return response;
+                    }
+
+                    // cancelling the future implies that the error handling 
happens somewhere else
+                    if (!ExceptionUtils.findThrowable(error, 
CancellationException.class)
+                            .isPresent()) {
+                        // Request failed: Close current connection and send 
back empty results
+                        // we catch every exception here because the Socket 
might suddenly become
+                        // null. We don't want the coordinator to fail if the 
sink fails.
+                        if (LOG.isDebugEnabled()) {
+                            LOG.warn(
+                                    "Collect sink coordinator encountered an 
unexpected error.",
+                                    error);
+                        } else {
+                            LOG.warn(
+                                    "Collect sink coordinator encounters a {}: 
{}",
+                                    error.getClass().getSimpleName(),
+                                    error.getMessage());
+                        }
+
+                        closeConnection();
+                    }
+
+                    return createEmptyResponse(collectRequest);
+                });
     }
 
-    private void handleRequestImpl(
-            CollectCoordinationRequest request,
-            CompletableFuture<CoordinationResponse> responseFuture,
-            InetSocketAddress sinkAddress) {
+    private CoordinationResponse handleRequestImpl(
+            CollectCoordinationRequest request, InetSocketAddress sinkAddress) 
throws IOException {
         if (sinkAddress == null) {
-            closeConnection();
-            completeWithEmptyResponse(request, responseFuture);
-            return;
+            throw new NullPointerException("No sinkAddress available.");
         }
 
-        try {
-            if (socket == null) {
-                socket = new Socket();
-                socket.setSoTimeout(socketTimeout);
-                socket.setKeepAlive(true);
-                socket.setTcpNoDelay(true);
-
-                socket.connect(sinkAddress);
-                inStream = new 
DataInputViewStreamWrapper(socket.getInputStream());
-                outStream = new 
DataOutputViewStreamWrapper(socket.getOutputStream());
-                LOG.info("Sink connection established");
-            }
+        if (socket == null) {
+            socket = new Socket();
+            socket.setSoTimeout(socketTimeout);
+            socket.setKeepAlive(true);
+            socket.setTcpNoDelay(true);
 
-            // send version and offset to sink server
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Forwarding request to sink socket server");
-            }
-            request.serialize(outStream);
-
-            // fetch back serialized results
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Fetching serialized result from sink socket 
server");
-            }
-            responseFuture.complete(new CollectCoordinationResponse(inStream));
-        } catch (Exception e) {
-            // request failed, close current connection and send back empty 
results
-            // we catch every exception here because socket might suddenly 
becomes null if the sink
-            // fails
-            // and we do not want the coordinator to fail
-            if (LOG.isDebugEnabled()) {
-                // this is normal when sink restarts or job ends, so we print 
a debug log
-                LOG.debug("Collect sink coordinator encounters an exception", 
e);
-            }
-            closeConnection();
-            completeWithEmptyResponse(request, responseFuture);
+            socket.connect(sinkAddress);
+            inStream = new DataInputViewStreamWrapper(socket.getInputStream());
+            outStream = new 
DataOutputViewStreamWrapper(socket.getOutputStream());
+            LOG.info("Sink connection established");
         }
+
+        // send version and offset to sink server
+        LOG.debug("Forwarding request to sink socket server");
+        request.serialize(outStream);
+
+        // fetch back serialized results
+        LOG.debug("Fetching serialized result from sink socket server");
+        return new CollectCoordinationResponse(inStream);
     }
 
-    private void completeWithEmptyResponse(
-            CollectCoordinationRequest request, 
CompletableFuture<CoordinationResponse> future) {
-        future.complete(
-                new CollectCoordinationResponse(
-                        request.getVersion(),
-                        // this lastCheckpointedOffset is OK
-                        // because client will only expose results to the 
users when the
-                        // checkpointed offset increases
-                        -1,
-                        Collections.emptyList()));
+    private CollectCoordinationResponse 
createEmptyResponse(CollectCoordinationRequest request) {
+        return new CollectCoordinationResponse(
+                request.getVersion(),
+                // this lastCheckpointedOffset is OK
+                // because client will only expose results to the users when 
the
+                // checkpointed offset increases
+                -1,
+                Collections.emptyList());
     }
 
     private void closeConnection() {
@@ -217,6 +233,9 @@ public class CollectSinkOperatorCoordinator
             throws Exception {
         if (checkpointData == null) {
             // restore before any checkpoint completed
+            LOG.info("Any ongoing requests are cancelled due to a coordinator 
reset.");
+            cancelOngoingRequests();
+
             closeConnection();
         } else {
             ByteArrayInputStream bais = new 
ByteArrayInputStream(checkpointData);
@@ -225,6 +244,11 @@ public class CollectSinkOperatorCoordinator
         }
     }
 
+    private void cancelOngoingRequests() {
+        ongoingRequests.forEach(ft -> ft.cancel(true));
+        ongoingRequests.clear();
+    }
+
     /** Provider for {@link CollectSinkOperatorCoordinator}. */
     public static class Provider implements OperatorCoordinator.Provider {
 

Reply via email to