Wancheng Xiao created FLINK-34727:
-------------------------------------
Summary: RestClusterClient.requestJobResult throw
ConnectionClosedException when the accumulator data is large
Key: FLINK-34727
URL: https://issues.apache.org/jira/browse/FLINK-34727
Project: Flink
Issue Type: Bug
Components: Runtime / REST
Affects Versions: 1.19.0, 1.16.2
Environment: i added some log to trace this bug: !AbstractHandler.png!
!AbstractRestHandler.png!
!MiniDispatcher.png!
!RestServerEndpoint.png!
then i got:
/*then call requestJobResult and shutDownFuture.complete; (close channel when
request deregisted)*/
2024-03-17 18:01:34.788 [flink-akka.actor.default-dispatcher-20] INFO
o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler -
JobExecutionResultHandler gateway.requestJobStatus complete.
[jobStatus=FINISHED]
/*submit sendResponse*/
2024-03-17 18:01:34.821 [flink-akka.actor.default-dispatcher-20] INFO
o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler - submit
HandlerUtils.sendResponse().
/*thenAccept(sendResponse()) is complete, will call inFlightRequestTracker, but
sendResponse's return feature not completed */
2024-03-17 18:01:34.821 [flink-akka.actor.default-dispatcher-20] INFO
o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler -
requestProcessingFuture complete.
[requestProcessingFuture=java.util.concurrent.CompletableFuture@1329aca5[Completed
normally]]
/*sendResponse's write task is still running*/
2024-03-17 18:01:34.822 [flink-rest-server-netty-worker-thread-10] INFO
o.a.f.s.netty4.io.netty.handler.stream.ChunkedWriteHandler - write
/*deregister request and then shut down, then channel close*/
2024-03-17 18:01:34.826 [flink-akka.actor.default-dispatcher-20] INFO
o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler - call
inFlightRequestTracker.deregisterRequest() done
2024-03-17 18:01:34.827 [flink-rest-server-netty-worker-thread-10] INFO
o.a.f.shaded.netty4.io.netty.channel.DefaultChannelPipeline - pipeline close.
2024-03-17 18:01:34.827 [flink-rest-server-netty-worker-thread-10] INFO
org.apache.flink.runtime.rest.handler.util.HandlerUtils - lastContentFuture
complete. [future=DefaultChannelPromise@621f03ea(failure:
java.nio.channels.ClosedChannelException)]
more details in flink_bug_complex.log
Reporter: Wancheng Xiao
Attachments: AbstractHandler.png, AbstractRestHandler.png,
MiniDispatcher.png, RestServerEndpoint.png, flink_bug_complex.log,
flink_bug_simple.log
The task was succeed, but "RestClusterClient.requestJobResult()" encountered an
error reporting ConnectionClosedException. (Channel became inactive)
After debugging, it is speculated that the problem occurred in the flink task
server-side "AbstractRestHandler.respondToRequest()" with the
"response.thenAccept(resp -> HandlerUtils.sendResponse())", this "thenAccept()"
did not pass the feature returned by sendResponse, causing the server shutdown
process before the request was sent. I suspect that "thenAccept()" needs to be
replaced with "thenCompose()"
The details are as follows:
*Pseudocode:*
1 . AbstractHandler.responseAsLeader(){
2 . inFlightRequestTracker.registerRequest()
3 . CompletableFuture<Void> requestProcessingFuture = respondToRequest(){
4 . response = JobExecutionResultHandler.handleRequest() {
5 . return (MiniDispatcher)gateway.requestJobResult(){
6 . CompletableFuture<JobResult> jobResultFuture =
super.requestJobResult(jobId, timeout);
7 . // wait deregisterRequest completed. then shut down server
8 . jobResultFuture.thenAccept(shutDownFuture.complete(status));
9 . return jobResultFuture;
10. }
11. }
12. // thenAccept cause requestProcessingFuture completed when
HandlerUtils.sendResponse complete instead of HandlerUtils.sendResponse complete
13. response.thenAccept(resp -> HandlerUtils.sendResponse(resp))
14. }
15.
requestProcessingFuture.whenComplete(inFlightRequestTracker.deregisterRequest())
16.}
*Server handling steps:*
netty-thread: got request
flink-dispatcher-thread: exec requestJobResult[6] and complete
shutDownFuture[7], then call HandlerUtils.sendResponse[8](netty async write)
netty-thread: write some data to channel.(not done)
flink-dispatcher-thread: call inFlightRequestTracker.deregisterRequest[9]
netty-thread: write some data to channel failed, channel not active
Additionally:
During the process of investigating this bug,
FutureUtils.retryOperationWithDelay swallowed the first occurrence of the
"Channel became inactive" exception and, after several retries, the server was
shut down,then the client throw "Connection refused" Exception. which had some
impact on the troubleshooting process. Could we consider adding some logging
here to aid in future diagnostics?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)