[ 
https://issues.apache.org/jira/browse/FLINK-34727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wancheng Xiao updated FLINK-34727:
----------------------------------
    Attachment: image-2024-03-19-15-51-20-150.png

> RestClusterClient.requestJobResult throw ConnectionClosedException when the 
> accumulator data is large
> -----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-34727
>                 URL: https://issues.apache.org/jira/browse/FLINK-34727
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / REST
>    Affects Versions: 1.16.2, 1.19.0
>            Reporter: Wancheng Xiao
>            Priority: Blocker
>         Attachments: AbstractHandler.png, AbstractRestHandler.png, 
> MiniDispatcher.png, RestServerEndpoint.png, flink_bug_complex.log, 
> flink_bug_simple.log, image-2024-03-19-15-51-20-150.png
>
>
> The task was succeed, but "RestClusterClient.requestJobResult()" encountered 
> an error reporting ConnectionClosedException. (Channel became inactive)
> After debugging, it is speculated that the problem occurred in the flink task 
> server-side "AbstractRestHandler.respondToRequest()" with the 
> "response.thenAccept(resp -> HandlerUtils.sendResponse())", this 
> "thenAccept()" did not pass the feature returned by sendResponse, causing the 
> server shutdown process before the request was sent. I suspect that 
> "thenAccept()" needs to be replaced with "thenCompose()"
> The details are as follows:
>  
> *Pseudocode:*
> 1 . AbstractHandler.responseAsLeader(){
> 2 .   inFlightRequestTracker.registerRequest()
> 3 .   CompletableFuture<Void> requestProcessingFuture = respondToRequest(){
> 4 .    response = JobExecutionResultHandler.handleRequest() {
> 5 .      return (MiniDispatcher)gateway.requestJobResult(){
> 6 .        CompletableFuture<JobResult> jobResultFuture = 
> super.requestJobResult(jobId, timeout);
> 7 .        // wait deregisterRequest completed. then shut down server
> 8 .        jobResultFuture.thenAccept(shutDownFuture.complete(status)); 
> 9 .        return jobResultFuture;
> 10.      }
> 11.    }
> 12.    // thenAccept cause requestProcessingFuture completed when 
> HandlerUtils.sendResponse complete instead of HandlerUtils.sendResponse 
> complete
> 13.    response.thenAccept(resp -> HandlerUtils.sendResponse(resp))
> 14.  }
> 15.  
> requestProcessingFuture.whenComplete(inFlightRequestTracker.deregisterRequest())
> 16.}
>  
> *Server handling steps:*
> netty-thread: got request
> flink-dispatcher-thread: exec requestJobResult[6] and complete 
> shutDownFuture[8], then call HandlerUtils.sendResponse[13](netty async write)
> netty-thread: write some data to channel.(not done)
> flink-dispatcher-thread: call inFlightRequestTracker.deregisterRequest[15]
> netty-thread: write some data to channel failed, channel not active
> i added some log to trace this bug:  !AbstractHandler.png!
> !AbstractRestHandler.png!
> !MiniDispatcher.png!
> !RestServerEndpoint.png!
> then i got:
> /{*}then call requestJobResult and shutDownFuture.complete; (close channel 
> when request deregisted){*}/
> 2024-03-17 18:01:34.788 [flink-akka.actor.default-dispatcher-20] INFO  
> o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler  - 
> JobExecutionResultHandler gateway.requestJobStatus complete. 
> [jobStatus=FINISHED]
> /{*}submit sendResponse{*}/
> 2024-03-17 18:01:34.821 [flink-akka.actor.default-dispatcher-20] INFO  
> o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler  - submit 
> HandlerUtils.sendResponse().
> /{*}thenAccept(sendResponse()) is complete, will call inFlightRequestTracker, 
> but sendResponse's return feature not completed{*}  /
> 2024-03-17 18:01:34.821 [flink-akka.actor.default-dispatcher-20] INFO  
> o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler  - 
> requestProcessingFuture complete. 
> [requestProcessingFuture=java.util.concurrent.CompletableFuture@1329aca5[Completed
>  normally]]
> /{*}sendResponse's write task is still running{*}/
> 2024-03-17 18:01:34.822 [flink-rest-server-netty-worker-thread-10] INFO  
> o.a.f.s.netty4.io.netty.handler.stream.ChunkedWriteHandler  - write
> /{*}deregister request and then shut down, then channel close{*}/
> 2024-03-17 18:01:34.826 [flink-akka.actor.default-dispatcher-20] INFO  
> o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler  - call 
> inFlightRequestTracker.deregisterRequest() done
> 2024-03-17 18:01:34.827 [flink-rest-server-netty-worker-thread-10] INFO  
> o.a.f.shaded.netty4.io.netty.channel.DefaultChannelPipeline  - pipeline close.
> 2024-03-17 18:01:34.827 [flink-rest-server-netty-worker-thread-10] INFO  
> org.apache.flink.runtime.rest.handler.util.HandlerUtils  - lastContentFuture 
> complete. [future=DefaultChannelPromise@621f03ea(failure: 
> java.nio.channels.ClosedChannelException)]
> *more details in flink_bug_complex.log*
>  
>  
>  
> Additionally:
> During the process of investigating this bug, 
> FutureUtils.retryOperationWithDelay swallowed the first occurrence of the 
> "Channel became inactive" exception and, after several retries, the server 
> was shut down,then the client throw "Connection refused" Exception. which had 
> some impact on the troubleshooting process. Could we consider adding some 
> logging here to aid in future diagnostics?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to