[
https://issues.apache.org/jira/browse/FLINK-16279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Wenlong Lyu updated FLINK-16279:
--------------------------------
Description:
I run a job in yarn per job mode using {{env.executeAsync}}, the job failed but
the yarn cluster didn't be destroyed.
After some research on the code, I found that:
when running in attached mode, MiniDispatcher will never set {{shutDownfuture}}
before received a request from job client.
{code}
if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
// terminate the MiniDispatcher once we served the
first JobResult successfully
jobResultFuture.thenAccept((JobResult result) -> {
ApplicationStatus status =
result.getSerializedThrowable().isPresent() ?
ApplicationStatus.FAILED :
ApplicationStatus.SUCCEEDED;
LOG.debug("Shutting down per-job cluster
because someone retrieved the job result.");
shutDownFuture.complete(status);
});
}
{code}
However, when running in async mode(submit job by env.executeAsync), there may
be no request from job client because when a user find that the job is failed
from job client, he may never request the result again.
was:
I run a job in yarn per job mode using {{env.executeAsync}}, the job failed but
the yarn cluster didn't be destroyed.
After some research on the code, I found that:
when running in attached mode, MiniDispatcher will neve set {{shutDownfuture}}
before received a request from job client.
{code}
if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
// terminate the MiniDispatcher once we served the
first JobResult successfully
jobResultFuture.thenAccept((JobResult result) -> {
ApplicationStatus status =
result.getSerializedThrowable().isPresent() ?
ApplicationStatus.FAILED :
ApplicationStatus.SUCCEEDED;
LOG.debug("Shutting down per-job cluster
because someone retrieved the job result.");
shutDownFuture.complete(status);
});
}
{code}
However, when running in async mode(submit job by env.executeAsync), there may
be no request from job client because when a user find that the job is failed
from job client, he may never request the result again.
> Per job Yarn application leak in normal execution mode.
> -------------------------------------------------------
>
> Key: FLINK-16279
> URL: https://issues.apache.org/jira/browse/FLINK-16279
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.10.0
> Reporter: Wenlong Lyu
> Priority: Major
>
> I run a job in yarn per job mode using {{env.executeAsync}}, the job failed
> but the yarn cluster didn't be destroyed.
> After some research on the code, I found that:
> when running in attached mode, MiniDispatcher will never set
> {{shutDownfuture}} before received a request from job client.
> {code}
> if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
> // terminate the MiniDispatcher once we served the
> first JobResult successfully
> jobResultFuture.thenAccept((JobResult result) -> {
> ApplicationStatus status =
> result.getSerializedThrowable().isPresent() ?
> ApplicationStatus.FAILED :
> ApplicationStatus.SUCCEEDED;
> LOG.debug("Shutting down per-job cluster
> because someone retrieved the job result.");
> shutDownFuture.complete(status);
> });
> }
> {code}
> However, when running in async mode(submit job by env.executeAsync), there
> may be no request from job client because when a user find that the job is
> failed from job client, he may never request the result again.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)