GitHub user MayerRoman opened a pull request:
https://github.com/apache/flink/pull/3287
[FLINK-4540][yarn] Delayed cluster startup until the job has been fully
assembled, in the case of launching detached jobs
[FLINK-4540] Detached job execution may prevent cluster shutdown.
**workflow description:**
In the process of launching detached jobs on YARN
`FlikYarnSession#createCluster` method call
`AbstractYarnClusterDescriptor#deploy`, this method triggers deployment of a
flink-cluster on YARN.
After that `CliFrontend#executeProgram` calls `ClusterClient#run`, causing
the start of the job preparation process.
Further method `YarnClusterClient#submitJob` calls
`YarnClusterClient#stopAfterJob` which ensures that the YarnJobManager shuts
down after the job completes, and calls `ClusterClient#runDetached` that sends
job on the cluster.
**how the bug occurs:**
If the error will be thrown after `AbstractYarnClusterDescriptor#deploy`,
but before sending job on the cluster, the jobmanager never shuts down the
cluster.
**changes:**
This pull request defers cluster startup until the job has been fully
assembled.
**additional Information:**
- these changes do not affect the work of FLIP-6
- explanation for rows 330-331 in `FlinkYarnSessionCli`:
After` "flink run ..."` in command line Flink creates instance of
`CliFronted`.
Befor creating instance of `CliFrontend` static block in this class is
executed. It creates instances of `FlinkYarnSessionCli`, `FlinkYarnCli`,
`DefaultCli `and puts it in static LinkedList.
In the case of real work, every time a new JVM starts.
In the case of executing tests in flink-yarn-tests, launchig job in
different modes occures in one JVM and static block in `CliFrontend ` is
executed only once.
Therefore once created instance of `FlinkYarnSessionCli ` is reused by all
tests in class.
`YARNSessionCapacitySchedulerITCase#testDetachedPerJobYarnCluster` and
`#testDetachedPerJobYarnClusterWithStreamingJob` during execution change
`private boolean detachedMode` in `FlinkYarnSessionCli `to true.
If after them will run
`YARNSessionCapacitySchedulerITCase#perJobYarnCluster` or
`#perJobYarnClusterWithParallelism`, because of the changed `detachedMode `in
`FlinkYarnSessionCli `they will go the wrong execution way.
A previous version of this part of the code was based on the fact that the
field is a priori false and only checks whether it is necessary to change it to
true.
```
if (cmd.hasOption(DETACHED.getOpt()) ||
cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt())) {
this.detachedMode = true;
yarnClusterDescriptor.setDetachedMode(true);
}
```
The new version of this part of the code changes `detachedMode `anyway,
whereby if it was true and it isn't launch of detached Job, it will be changed
into false.
```
this.detachedMode = cmd.hasOption(DETACHED.getOpt()) ||
cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt());
yarnClusterDescriptor.setDetachedMode(this.detachedMode);
```
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/MayerRoman/flink FLINK-4540
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3287.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3287
----
commit cca5c31767a76a560e66193f28e023210d592dbf
Author: Roman Maier <[email protected]>
Date: 2017-02-08T08:04:15Z
[FLINK-4540][yarn] Delayed cluster startup until the job has been fully
assembled, in the case of launching detached jobs
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---