[
https://issues.apache.org/jira/browse/FLINK-1771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14387939#comment-14387939
]
ASF GitHub Bot commented on FLINK-1771:
---------------------------------------
Github user hsaputra commented on a diff in the pull request:
https://github.com/apache/flink/pull/542#discussion_r27451902
--- Diff:
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -265,12 +266,32 @@ protected int run(String[] args) {
}
try {
- Client client = getClient(options,
program.getUserCodeClassLoader(), program.getMainClassName());
-
- int parallelism = options.getParallelism();
- int exitCode = executeProgram(program, client,
parallelism);
-
- if (yarnCluster != null) {
+ int userParallelism = options.getParallelism();
+ LOG.debug("User parallelism is set to {}",
userParallelism);
+
+ Client client = getClient(options,
program.getUserCodeClassLoader(), program.getMainClassName(), userParallelism);
+ LOG.debug("Client slots is set to {}",
client.getMaxSlots());
+ if(client.getMaxSlots() != -1 && userParallelism == -1)
{
+ logAndSysout("Using the parallelism provided by
the remote cluster ("+client.getMaxSlots()+"). " +
+ "To use another parallelism,
set it at the ./bin/flink client.");
+ userParallelism = client.getMaxSlots();
+ }
+ int exitCode = 0;
+
+ // check if detached per job yarn cluster is used to
start flink
+ if(yarnCluster != null && yarnCluster.isDetached()) {
+ logAndSysout("The Flink YARN client has been
started in detached mode. In order to stop " +
+ "Flink on YARN, use the
following command or a YARN web interface to stop it:\n" +
+ "yarn application -kill
"+yarnCluster.getApplicationId()+"\n" +
+ "Please also note that the
temporary files of the YARN session in the home directoy will not be removed.");
+ executeProgram(program, client,
userParallelism, false);
+ } else {
+ // regular (blocking) execution.
+ exitCode = executeProgram(program, client,
userParallelism, true);
+ }
+
+ // show YARN cluster status if its not a detached YARN
cluster.
+ if (yarnCluster != null && !yarnCluster.isDetached()) {
--- End diff --
Since finally trumps returns statement, the code block following this check
could be moved to finally statement below?
> Add support for submitting single jobs to a detached YARN session
> -----------------------------------------------------------------
>
> Key: FLINK-1771
> URL: https://issues.apache.org/jira/browse/FLINK-1771
> Project: Flink
> Issue Type: Improvement
> Components: YARN Client
> Affects Versions: 0.9
> Reporter: Robert Metzger
> Assignee: Robert Metzger
>
> We need tests ensuring that the processing slots are set properly when
> starting Flink on YARN, in particular with the per job YARN session feature.
> Also, the YARN tests for detached YARN sessions / per job yarn clusters are
> polluting the local home-directory.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)