[ 
https://issues.apache.org/jira/browse/FLINK-34727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wancheng Xiao updated FLINK-34727:
----------------------------------
    Description: 
The task was succeed, but "RestClusterClient.requestJobResult()" encountered an 
error reporting ConnectionClosedException. (Channel became inactive)
After debugging, it is speculated that the problem occurred in the flink task 
server-side "AbstractRestHandler.respondToRequest()" with the 
"response.thenAccept(resp -> HandlerUtils.sendResponse())", this "thenAccept()" 
did not pass the feature returned by sendResponse, causing the server shutdown 
process before the request was sent. I suspect that "thenAccept()" needs to be 
replaced with "thenCompose()"

The details are as follows:

 

*Pseudocode:*

!image-2024-03-19-15-51-20-150.png!

 

*Server handling steps:*

netty-thread: got request
flink-dispatcher-thread: exec requestJobResult[6] and complete 
shutDownFuture[8], then call HandlerUtils.sendResponse[13](netty async write)
netty-thread: write some data to channel.(not done)
flink-dispatcher-thread: call inFlightRequestTracker.deregisterRequest[15]
netty-thread: write some data to channel failed, channel not active

i added some log to trace this bug:  !AbstractHandler.png!

!AbstractRestHandler.png!

!MiniDispatcher.png!

!RestServerEndpoint.png!

then i got:

/{*}then call requestJobResult and shutDownFuture.complete; (close channel when 
request deregisted){*}/
2024-03-17 18:01:34.788 [flink-akka.actor.default-dispatcher-20] INFO  
o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler  - 
JobExecutionResultHandler gateway.requestJobStatus complete. 
[jobStatus=FINISHED]
/{*}submit sendResponse{*}/
2024-03-17 18:01:34.821 [flink-akka.actor.default-dispatcher-20] INFO  
o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler  - submit 
HandlerUtils.sendResponse().
/{*}thenAccept(sendResponse()) is complete, will call inFlightRequestTracker, 
but sendResponse's return feature not completed{*}  /
2024-03-17 18:01:34.821 [flink-akka.actor.default-dispatcher-20] INFO  
o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler  - 
requestProcessingFuture complete. 
[requestProcessingFuture=java.util.concurrent.CompletableFuture@1329aca5[Completed
 normally]]
/{*}sendResponse's write task is still running{*}/
2024-03-17 18:01:34.822 [flink-rest-server-netty-worker-thread-10] INFO  
o.a.f.s.netty4.io.netty.handler.stream.ChunkedWriteHandler  - write
/{*}deregister request and then shut down, then channel close{*}/
2024-03-17 18:01:34.826 [flink-akka.actor.default-dispatcher-20] INFO  
o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler  - call 
inFlightRequestTracker.deregisterRequest() done
2024-03-17 18:01:34.827 [flink-rest-server-netty-worker-thread-10] INFO  
o.a.f.shaded.netty4.io.netty.channel.DefaultChannelPipeline  - pipeline close.
2024-03-17 18:01:34.827 [flink-rest-server-netty-worker-thread-10] INFO  
org.apache.flink.runtime.rest.handler.util.HandlerUtils  - lastContentFuture 
complete. [future=DefaultChannelPromise@621f03ea(failure: 
java.nio.channels.ClosedChannelException)]

*more details in flink_bug_complex.log*

 

 

 

Additionally:

During the process of investigating this bug, 
FutureUtils.retryOperationWithDelay swallowed the first occurrence of the 
"Channel became inactive" exception and, after several retries, the server was 
shut down,then the client throw "Connection refused" Exception. which had some 
impact on the troubleshooting process. Could we consider adding some logging 
here to aid in future diagnostics?

  was:
The task was succeed, but "RestClusterClient.requestJobResult()" encountered an 
error reporting ConnectionClosedException. (Channel became inactive)
After debugging, it is speculated that the problem occurred in the flink task 
server-side "AbstractRestHandler.respondToRequest()" with the 
"response.thenAccept(resp -> HandlerUtils.sendResponse())", this "thenAccept()" 
did not pass the feature returned by sendResponse, causing the server shutdown 
process before the request was sent. I suspect that "thenAccept()" needs to be 
replaced with "thenCompose()"

The details are as follows:

 

*Pseudocode:*

1 . AbstractHandler.responseAsLeader(){
2 .   inFlightRequestTracker.registerRequest()
3 .   CompletableFuture<Void> requestProcessingFuture = respondToRequest(){
4 .    response = JobExecutionResultHandler.handleRequest() {
5 .      return (MiniDispatcher)gateway.requestJobResult(){

6 .        CompletableFuture<JobResult> jobResultFuture = 
super.requestJobResult(jobId, timeout);

7 .        // wait deregisterRequest completed. then shut down server

8 .        jobResultFuture.thenAccept(shutDownFuture.complete(status)); 

9 .        return jobResultFuture;

10.      }

11.    }
12.    // thenAccept cause requestProcessingFuture completed when 
HandlerUtils.sendResponse complete instead of HandlerUtils.sendResponse complete
13.    response.thenAccept(resp -> HandlerUtils.sendResponse(resp))
14.  }
15.  
requestProcessingFuture.whenComplete(inFlightRequestTracker.deregisterRequest())
16.}

 

*Server handling steps:*

netty-thread: got request
flink-dispatcher-thread: exec requestJobResult[6] and complete 
shutDownFuture[8], then call HandlerUtils.sendResponse[13](netty async write)
netty-thread: write some data to channel.(not done)
flink-dispatcher-thread: call inFlightRequestTracker.deregisterRequest[15]
netty-thread: write some data to channel failed, channel not active

i added some log to trace this bug:  !AbstractHandler.png!

!AbstractRestHandler.png!

!MiniDispatcher.png!

!RestServerEndpoint.png!

then i got:

/{*}then call requestJobResult and shutDownFuture.complete; (close channel when 
request deregisted){*}/
2024-03-17 18:01:34.788 [flink-akka.actor.default-dispatcher-20] INFO  
o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler  - 
JobExecutionResultHandler gateway.requestJobStatus complete. 
[jobStatus=FINISHED]
/{*}submit sendResponse{*}/
2024-03-17 18:01:34.821 [flink-akka.actor.default-dispatcher-20] INFO  
o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler  - submit 
HandlerUtils.sendResponse().
/{*}thenAccept(sendResponse()) is complete, will call inFlightRequestTracker, 
but sendResponse's return feature not completed{*}  /
2024-03-17 18:01:34.821 [flink-akka.actor.default-dispatcher-20] INFO  
o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler  - 
requestProcessingFuture complete. 
[requestProcessingFuture=java.util.concurrent.CompletableFuture@1329aca5[Completed
 normally]]
/{*}sendResponse's write task is still running{*}/
2024-03-17 18:01:34.822 [flink-rest-server-netty-worker-thread-10] INFO  
o.a.f.s.netty4.io.netty.handler.stream.ChunkedWriteHandler  - write
/{*}deregister request and then shut down, then channel close{*}/
2024-03-17 18:01:34.826 [flink-akka.actor.default-dispatcher-20] INFO  
o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler  - call 
inFlightRequestTracker.deregisterRequest() done
2024-03-17 18:01:34.827 [flink-rest-server-netty-worker-thread-10] INFO  
o.a.f.shaded.netty4.io.netty.channel.DefaultChannelPipeline  - pipeline close.
2024-03-17 18:01:34.827 [flink-rest-server-netty-worker-thread-10] INFO  
org.apache.flink.runtime.rest.handler.util.HandlerUtils  - lastContentFuture 
complete. [future=DefaultChannelPromise@621f03ea(failure: 
java.nio.channels.ClosedChannelException)]

*more details in flink_bug_complex.log*

 

 

 

Additionally:

During the process of investigating this bug, 
FutureUtils.retryOperationWithDelay swallowed the first occurrence of the 
"Channel became inactive" exception and, after several retries, the server was 
shut down,then the client throw "Connection refused" Exception. which had some 
impact on the troubleshooting process. Could we consider adding some logging 
here to aid in future diagnostics?


> RestClusterClient.requestJobResult throw ConnectionClosedException when the 
> accumulator data is large
> -----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-34727
>                 URL: https://issues.apache.org/jira/browse/FLINK-34727
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / REST
>    Affects Versions: 1.16.2, 1.19.0
>            Reporter: Wancheng Xiao
>            Priority: Blocker
>         Attachments: AbstractHandler.png, AbstractRestHandler.png, 
> MiniDispatcher.png, RestServerEndpoint.png, flink_bug_complex.log, 
> flink_bug_simple.log, image-2024-03-19-15-51-20-150.png
>
>
> The task was succeed, but "RestClusterClient.requestJobResult()" encountered 
> an error reporting ConnectionClosedException. (Channel became inactive)
> After debugging, it is speculated that the problem occurred in the flink task 
> server-side "AbstractRestHandler.respondToRequest()" with the 
> "response.thenAccept(resp -> HandlerUtils.sendResponse())", this 
> "thenAccept()" did not pass the feature returned by sendResponse, causing the 
> server shutdown process before the request was sent. I suspect that 
> "thenAccept()" needs to be replaced with "thenCompose()"
> The details are as follows:
>  
> *Pseudocode:*
> !image-2024-03-19-15-51-20-150.png!
>  
> *Server handling steps:*
> netty-thread: got request
> flink-dispatcher-thread: exec requestJobResult[6] and complete 
> shutDownFuture[8], then call HandlerUtils.sendResponse[13](netty async write)
> netty-thread: write some data to channel.(not done)
> flink-dispatcher-thread: call inFlightRequestTracker.deregisterRequest[15]
> netty-thread: write some data to channel failed, channel not active
> i added some log to trace this bug:  !AbstractHandler.png!
> !AbstractRestHandler.png!
> !MiniDispatcher.png!
> !RestServerEndpoint.png!
> then i got:
> /{*}then call requestJobResult and shutDownFuture.complete; (close channel 
> when request deregisted){*}/
> 2024-03-17 18:01:34.788 [flink-akka.actor.default-dispatcher-20] INFO  
> o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler  - 
> JobExecutionResultHandler gateway.requestJobStatus complete. 
> [jobStatus=FINISHED]
> /{*}submit sendResponse{*}/
> 2024-03-17 18:01:34.821 [flink-akka.actor.default-dispatcher-20] INFO  
> o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler  - submit 
> HandlerUtils.sendResponse().
> /{*}thenAccept(sendResponse()) is complete, will call inFlightRequestTracker, 
> but sendResponse's return feature not completed{*}  /
> 2024-03-17 18:01:34.821 [flink-akka.actor.default-dispatcher-20] INFO  
> o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler  - 
> requestProcessingFuture complete. 
> [requestProcessingFuture=java.util.concurrent.CompletableFuture@1329aca5[Completed
>  normally]]
> /{*}sendResponse's write task is still running{*}/
> 2024-03-17 18:01:34.822 [flink-rest-server-netty-worker-thread-10] INFO  
> o.a.f.s.netty4.io.netty.handler.stream.ChunkedWriteHandler  - write
> /{*}deregister request and then shut down, then channel close{*}/
> 2024-03-17 18:01:34.826 [flink-akka.actor.default-dispatcher-20] INFO  
> o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler  - call 
> inFlightRequestTracker.deregisterRequest() done
> 2024-03-17 18:01:34.827 [flink-rest-server-netty-worker-thread-10] INFO  
> o.a.f.shaded.netty4.io.netty.channel.DefaultChannelPipeline  - pipeline close.
> 2024-03-17 18:01:34.827 [flink-rest-server-netty-worker-thread-10] INFO  
> org.apache.flink.runtime.rest.handler.util.HandlerUtils  - lastContentFuture 
> complete. [future=DefaultChannelPromise@621f03ea(failure: 
> java.nio.channels.ClosedChannelException)]
> *more details in flink_bug_complex.log*
>  
>  
>  
> Additionally:
> During the process of investigating this bug, 
> FutureUtils.retryOperationWithDelay swallowed the first occurrence of the 
> "Channel became inactive" exception and, after several retries, the server 
> was shut down,then the client throw "Connection refused" Exception. which had 
> some impact on the troubleshooting process. Could we consider adding some 
> logging here to aid in future diagnostics?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to