Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5432#discussion_r168195242
--- Diff:
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -210,51 +225,72 @@ protected void run(String[] args) throws Exception {
final ClusterClient<T> client;
- if (clusterId != null) {
- client = clusterDescriptor.retrieve(clusterId);
- } else {
- final ClusterSpecification clusterSpecification
= customCommandLine.getClusterSpecification(commandLine);
- client =
clusterDescriptor.deploySessionCluster(clusterSpecification);
- }
+ // directly deploy the job if the cluster is started in
job mode and detached
+ if (flip6 && clusterId == null &&
runOptions.getDetachedMode()) {
+ int parallelism = runOptions.getParallelism()
== -1 ? defaultParallelism : runOptions.getParallelism();
- try {
-
client.setPrintStatusDuringExecution(runOptions.getStdoutLogging());
-
client.setDetached(runOptions.getDetachedMode());
- LOG.debug("Client slots is set to {}",
client.getMaxSlots());
-
-
LOG.debug(runOptions.getSavepointRestoreSettings().toString());
-
- int userParallelism =
runOptions.getParallelism();
- LOG.debug("User parallelism is set to {}",
userParallelism);
- if (client.getMaxSlots() != -1 &&
userParallelism == -1) {
- logAndSysout("Using the parallelism
provided by the remote cluster ("
- + client.getMaxSlots() + "). "
- + "To use another parallelism,
set it at the ./bin/flink client.");
- userParallelism = client.getMaxSlots();
- } else if (ExecutionConfig.PARALLELISM_DEFAULT
== userParallelism) {
- userParallelism = defaultParallelism;
- }
+ final JobGraph jobGraph =
createJobGraph(configuration, program, parallelism);
- executeProgram(program, client,
userParallelism);
- } finally {
- if (clusterId == null && !client.isDetached()) {
- // terminate the cluster only if we
have started it before and if it's not detached
- try {
-
clusterDescriptor.terminateCluster(client.getClusterId());
- } catch (FlinkException e) {
- LOG.info("Could not properly
terminate the Flink cluster.", e);
- }
- }
+ final ClusterSpecification clusterSpecification
= customCommandLine.getClusterSpecification(commandLine);
+ client = clusterDescriptor.deployJobCluster(
+ clusterSpecification,
+ jobGraph,
+ runOptions.getDetachedMode());
+
+ logAndSysout("Job has been submitted with JobID
" + jobGraph.getJobID());
try {
client.shutdown();
} catch (Exception e) {
LOG.info("Could not properly shut down
the client.", e);
}
+ } else {
+ if (clusterId != null) {
+ client =
clusterDescriptor.retrieve(clusterId);
+ } else {
+ // also in job mode we have to deploy a
session cluster because the job
+ // might consist of multiple parts
(e.g. when using collect)
+ final ClusterSpecification
clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
+ client =
clusterDescriptor.deploySessionCluster(clusterSpecification);
+ }
+
+ try {
+
client.setPrintStatusDuringExecution(runOptions.getStdoutLogging());
+
client.setDetached(runOptions.getDetachedMode());
+ LOG.debug("Client slots is set to {}",
client.getMaxSlots());
+
+
LOG.debug(runOptions.getSavepointRestoreSettings().toString());
--- End diff --
True, will improve it.
---