zentol commented on code in PR #23180:
URL: https://github.com/apache/flink/pull/23180#discussion_r1291211941


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java:
##########
@@ -101,75 +110,81 @@ public CompletableFuture<CoordinationResponse> 
handleCoordinationRequest(
                 "Coordination request must be a CollectCoordinationRequest");
 
         CollectCoordinationRequest collectRequest = 
(CollectCoordinationRequest) request;
-        CompletableFuture<CoordinationResponse> responseFuture = new 
CompletableFuture<>();
-
         if (address == null) {
-            completeWithEmptyResponse(collectRequest, responseFuture);
-            return responseFuture;
+            return 
CompletableFuture.completedFuture(createEmptyResponse(collectRequest));
         }
 
-        executorService.submit(() -> handleRequestImpl(collectRequest, 
responseFuture, address));
-        return responseFuture;
+        final CompletableFuture<CoordinationResponse> responseFuture =
+                FutureUtils.supplyAsync(
+                        () -> handleRequestImpl(collectRequest, address), 
executorService);
+
+        ongoingRequests.add(responseFuture);
+        return responseFuture.handle(
+                (response, error) -> {
+                    if (response != null) {
+                        ongoingRequests.remove(responseFuture);
+                        return response;
+                    }

Review Comment:
   Don't we always need to remove the request?
   ```suggestion
                       ongoingRequests.remove(responseFuture);
                       if (response != null) {
                           return response;
                       }
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java:
##########
@@ -239,11 +230,12 @@ public void testCloseClientBeforeRequest() throws 
Exception {
             // Call get() on the future with a timeout of 0s so we can test 
that the exception
             // thrown is not a TimeoutException, which is what would be thrown 
if restClient were
             // not already closed
-            final ThrowingRunnable getFuture = () -> future.get(0, 
TimeUnit.SECONDS);
-
-            final Throwable cause = assertThrows(ExecutionException.class, 
getFuture).getCause();
-            assertThat(cause, instanceOf(IllegalStateException.class));
-            assertThat(cause.getMessage(), equalTo("RestClient is already 
closed"));
+            assertThat(future)
+                    .failsWithin(Duration.ZERO)

Review Comment:
   Use FlinkAssertions insteads? more instances below



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java:
##########
@@ -101,75 +110,81 @@ public CompletableFuture<CoordinationResponse> 
handleCoordinationRequest(
                 "Coordination request must be a CollectCoordinationRequest");
 
         CollectCoordinationRequest collectRequest = 
(CollectCoordinationRequest) request;
-        CompletableFuture<CoordinationResponse> responseFuture = new 
CompletableFuture<>();
-
         if (address == null) {
-            completeWithEmptyResponse(collectRequest, responseFuture);
-            return responseFuture;
+            return 
CompletableFuture.completedFuture(createEmptyResponse(collectRequest));
         }
 
-        executorService.submit(() -> handleRequestImpl(collectRequest, 
responseFuture, address));
-        return responseFuture;
+        final CompletableFuture<CoordinationResponse> responseFuture =
+                FutureUtils.supplyAsync(
+                        () -> handleRequestImpl(collectRequest, address), 
executorService);
+
+        ongoingRequests.add(responseFuture);
+        return responseFuture.handle(
+                (response, error) -> {
+                    if (response != null) {
+                        ongoingRequests.remove(responseFuture);
+                        return response;
+                    }
+
+                    // cancelling the future implies that the error handling 
happens somewhere else
+                    if (!ExceptionUtils.findThrowable(error, 
CancellationException.class)
+                            .isPresent()) {
+                        // Request failed: Close current connection and send 
back empty results
+                        // we catch every exception here because the Socket 
might suddenly become
+                        // null. We don't want the coordinator to fail if the 
sink fails.
+                        if (LOG.isDebugEnabled()) {
+                            LOG.warn(
+                                    "Collect sink coordinator encountered an 
unexpected error.",
+                                    error);
+                        } else {
+                            LOG.warn(
+                                    "Collect sink coordinator encounters a {}: 
{}",
+                                    error.getClass().getSimpleName(),
+                                    error.getMessage());
+                        }
+
+                        closeConnection();
+                    }
+
+                    return createEmptyResponse(collectRequest);

Review Comment:
   it's so strange that we don't return an error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to