[ 
https://issues.apache.org/jira/browse/FLINK-16279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17049092#comment-17049092
 ] 

Kostas Kloudas commented on FLINK-16279:
----------------------------------------

If I understand the usecase correctly, we have a job submitted in yarn pre-job 
mode, attached, using {{executeAsync()}} and the job failed but the cluster was 
expecting a final request for the job result to shut down, but this request 
never came.

[~tison] For session cluster the cluster lifecycle is independent from that of 
the job, so I guess that no action should be taken in this case.

For the per-job cluster, the {{shutdownOnExit}} could maybe work because as 
soon as the client disconnects, it will issue a (best-effort) shutdown cluster 
command. 

Another (maybe cleaner) solution, could be that if the job reaches a terminal 
state which is NOT normal termination, then we always tear down the cluster. 
The benefit of this method is that it is the dispatcher that aligns the 
lifecycle of the job with that of the cluster, and not the client which is 
controlled by the user. This would require changes in the 
{{jobReachedGloballyTerminalState()}} in the {{MiniDispatcher}}. This of course 
leaves open the scenario of what happens when the client fails/disconnects. In 
this case we will have a "zombie" cluster. In this case we may need the 
{{shutdownOnExit}}.

But these are just thoughts that I have not yet investigated 100%.

What are your thoughts on that? 

> Per job Yarn application leak in normal execution mode.
> -------------------------------------------------------
>
>                 Key: FLINK-16279
>                 URL: https://issues.apache.org/jira/browse/FLINK-16279
>             Project: Flink
>          Issue Type: Bug
>          Components: Client / Job Submission, Runtime / Coordination
>    Affects Versions: 1.10.0
>            Reporter: Wenlong Lyu
>            Priority: Major
>
> I run a job in yarn per job mode using {{env.executeAsync}}, the job failed 
> but the yarn cluster didn't be destroyed.
> After some research on the code, I found that:
> when running in attached mode, MiniDispatcher will never set 
> {{shutDownfuture}} before received a request from job client. 
> {code}
>               if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
>                       // terminate the MiniDispatcher once we served the 
> first JobResult successfully
>                       jobResultFuture.thenAccept((JobResult result) -> {
>                               ApplicationStatus status = 
> result.getSerializedThrowable().isPresent() ?
>                                               ApplicationStatus.FAILED : 
> ApplicationStatus.SUCCEEDED;
>                               LOG.debug("Shutting down per-job cluster 
> because someone retrieved the job result.");
>                               shutDownFuture.complete(status);
>                       });
>               } 
> {code}
> However, when running in async mode(submit job by env.executeAsync), there 
> may be no request from job client because when a user find that the job is 
> failed from job client, he may never request the result again.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to