[ 
https://issues.apache.org/jira/browse/FLINK-34727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839922#comment-17839922
 ] 

Wancheng Xiao commented on FLINK-34727:
---------------------------------------

[~Weijie Guo] Hello, can you help find someone to take a look? We’ve been 
troubled by this issue for some time.

> RestClusterClient.requestJobResult throw ConnectionClosedException when the 
> accumulator data is large
> -----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-34727
>                 URL: https://issues.apache.org/jira/browse/FLINK-34727
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / REST
>    Affects Versions: 1.16.2, 1.19.0
>            Reporter: Wancheng Xiao
>            Priority: Critical
>              Labels: pull-request-available
>         Attachments: AbstractHandler.png, AbstractRestHandler.png, 
> MiniDispatcher.png, RestServerEndpoint.png, flink_bug_complex.log, 
> flink_bug_simple.log, image-2024-03-19-15-51-20-150.png
>
>
> The task was succeed, but "RestClusterClient.requestJobResult()" encountered 
> an error reporting ConnectionClosedException. (Channel became inactive)
> After debugging, it is speculated that the problem occurred in the flink task 
> server-side "AbstractRestHandler.respondToRequest()" with the 
> "response.thenAccept(resp -> HandlerUtils.sendResponse())", this 
> "thenAccept()" did not pass the future returned by sendResponse, causing the 
> server shutdown process before the request was sent. I suspect that 
> "thenAccept()" needs to be replaced with "thenCompose()"
> The details are as follows:
>  
> *Pseudocode:*
> !image-2024-03-19-15-51-20-150.png|width=802,height=222!
>  
> *Server handling steps:*
> netty-thread: got request
> flink-dispatcher-thread: exec requestJobResult[6] and complete 
> shutDownFuture[8], then call HandlerUtils.sendResponse[13](netty async write)
> netty-thread: write some data to channel.(not done)
> flink-dispatcher-thread: call inFlightRequestTracker.deregisterRequest[15]
> netty-thread: write some data to channel failed, channel not active
> i added some log to trace this bug:
> !AbstractHandler.png|width=406,height=313!
> !AbstractRestHandler.png|width=418,height=322!
> !MiniDispatcher.png|width=419,height=277!
> !RestServerEndpoint.png|width=419,height=279!
> then i got:
> /{*}then call requestJobResult and shutDownFuture.complete; (close channel 
> when request deregisted){*}/
> 2024-03-17 18:01:34.788 [flink-akka.actor.default-dispatcher-20] INFO  
> o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler  - 
> JobExecutionResultHandler gateway.requestJobStatus complete. 
> [jobStatus=FINISHED]
> /{*}submit sendResponse{*}/
> 2024-03-17 18:01:34.821 [flink-akka.actor.default-dispatcher-20] INFO  
> o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler  - submit 
> HandlerUtils.sendResponse().
> /{*}thenAccept(sendResponse()) is complete, will call inFlightRequestTracker, 
> but sendResponse's return future not completed{*}  /
> 2024-03-17 18:01:34.821 [flink-akka.actor.default-dispatcher-20] INFO  
> o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler  - 
> requestProcessingFuture complete. 
> [requestProcessingFuture=java.util.concurrent.CompletableFuture@1329aca5[Completed
>  normally]]
> /{*}sendResponse's write task is still running{*}/
> 2024-03-17 18:01:34.822 [flink-rest-server-netty-worker-thread-10] INFO  
> o.a.f.s.netty4.io.netty.handler.stream.ChunkedWriteHandler  - write
> /{*}deregister request and then shut down, then channel close{*}/
> 2024-03-17 18:01:34.826 [flink-akka.actor.default-dispatcher-20] INFO  
> o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler  - call 
> inFlightRequestTracker.deregisterRequest() done
> 2024-03-17 18:01:34.827 [flink-rest-server-netty-worker-thread-10] INFO  
> o.a.f.shaded.netty4.io.netty.channel.DefaultChannelPipeline  - pipeline close.
> 2024-03-17 18:01:34.827 [flink-rest-server-netty-worker-thread-10] INFO  
> org.apache.flink.runtime.rest.handler.util.HandlerUtils  - lastContentFuture 
> complete. [future=DefaultChannelPromise@621f03ea(failure: 
> java.nio.channels.ClosedChannelException)]
> *more details in flink_bug_complex.log*
>  
>  
>  
> Additionally:
> During the process of investigating this bug, 
> FutureUtils.retryOperationWithDelay swallowed the first occurrence of the 
> "Channel became inactive" exception and, after several retries, the server 
> was shut down,then the client throw "Connection refused" Exception. which had 
> some impact on the troubleshooting process. Could we consider adding some 
> logging here to aid in future diagnostics?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to