XComp commented on code in PR #23180:
URL: https://github.com/apache/flink/pull/23180#discussion_r1291318008
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java:
##########
@@ -101,75 +110,81 @@ public CompletableFuture<CoordinationResponse>
handleCoordinationRequest(
"Coordination request must be a CollectCoordinationRequest");
CollectCoordinationRequest collectRequest =
(CollectCoordinationRequest) request;
- CompletableFuture<CoordinationResponse> responseFuture = new
CompletableFuture<>();
-
if (address == null) {
- completeWithEmptyResponse(collectRequest, responseFuture);
- return responseFuture;
+ return
CompletableFuture.completedFuture(createEmptyResponse(collectRequest));
}
- executorService.submit(() -> handleRequestImpl(collectRequest,
responseFuture, address));
- return responseFuture;
+ final CompletableFuture<CoordinationResponse> responseFuture =
+ FutureUtils.supplyAsync(
+ () -> handleRequestImpl(collectRequest, address),
executorService);
+
+ ongoingRequests.add(responseFuture);
+ return responseFuture.handle(
+ (response, error) -> {
+ if (response != null) {
+ ongoingRequests.remove(responseFuture);
+ return response;
+ }
+
+ // cancelling the future implies that the error handling
happens somewhere else
+ if (!ExceptionUtils.findThrowable(error,
CancellationException.class)
+ .isPresent()) {
+ // Request failed: Close current connection and send
back empty results
+ // we catch every exception here because the Socket
might suddenly become
+ // null. We don't want the coordinator to fail if the
sink fails.
+ if (LOG.isDebugEnabled()) {
+ LOG.warn(
+ "Collect sink coordinator encountered an
unexpected error.",
+ error);
+ } else {
+ LOG.warn(
+ "Collect sink coordinator encounters a {}:
{}",
+ error.getClass().getSimpleName(),
+ error.getMessage());
+ }
+
+ closeConnection();
+ }
+
+ return createEmptyResponse(collectRequest);
Review Comment:
I guess, the client code in `CollectResultFetcher#next` (see
[CollectResultFetcher:97ff](https://github.com/apache/flink/blob/8119411addd9c82c15bab8480e7b35b8e6394d43/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java#L97))
considers Flink's errors to be internal. It needs to fetch (in its current
implementation) the final job result, anyway, to collect the final data. Hiding
the errors from the client leaves the error handling on Flink's side and the
client can just poll for more data until the job reaches a globally-terminal
state. :shrug:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]