This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8212ba8d3721f7fc985036473b76bee32eddbdc7
Author: Kostas Kloudas <kklou...@gmail.com>
AuthorDate: Mon Nov 18 10:25:19 2019 +0100

    [FLINK-XXXXX] Deduplicate the Executor code and fix javadocs.
---
 .../deployment/AbstractJobClusterExecutor.java     | 71 ++++++++++++++++++++++
 .../deployment/AbstractSessionClusterExecutor.java | 66 ++++++++++++++++++++
 .../flink/client/deployment/ExecutorUtils.java     | 59 ++++++++++++++++++
 .../StandaloneSessionClusterExecutor.java          | 63 +------------------
 .../org/apache/flink/core/execution/Executor.java  | 13 ++--
 .../yarn/executors/YarnJobClusterExecutor.java     | 68 +--------------------
 .../yarn/executors/YarnSessionClusterExecutor.java | 53 +---------------
 7 files changed, 214 insertions(+), 179 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
new file mode 100644
index 0000000..310dd61
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.deployment.executors.JobClientImpl;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An abstract {@link Executor} used to execute {@link Pipeline pipelines} on 
dedicated (per-job) clusters.
+ *
+ * @param <ClusterID> the type of the id of the cluster.
+ * @param <ClientFactory> the type of the {@link ClusterClientFactory} used to 
create/retrieve a client to the target cluster.
+ */
+@Internal
+public class AbstractJobClusterExecutor<ClusterID, ClientFactory extends 
ClusterClientFactory<ClusterID>> implements Executor {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(AbstractJobClusterExecutor.class);
+
+       private final ClientFactory clusterClientFactory;
+
+       public AbstractJobClusterExecutor(@Nonnull final ClientFactory 
clusterClientFactory) {
+               this.clusterClientFactory = checkNotNull(clusterClientFactory);
+       }
+
+       @Override
+       public CompletableFuture<JobClient> execute(@Nonnull final Pipeline 
pipeline, @Nonnull final Configuration configuration) throws Exception {
+               final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, 
configuration);
+
+               try (final ClusterDescriptor<ClusterID> clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(configuration)) {
+                       final ExecutionConfigAccessor configAccessor = 
ExecutionConfigAccessor.fromConfiguration(configuration);
+
+                       final ClusterSpecification clusterSpecification = 
clusterClientFactory.getClusterSpecification(configuration);
+
+                       final ClusterClient<ClusterID> client = 
clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, 
configAccessor.getDetachedMode());
+                       LOG.info("Job has been submitted with JobID " + 
jobGraph.getJobID());
+                       return CompletableFuture.completedFuture(new 
JobClientImpl<>(client, jobGraph.getJobID()));
+               }
+       }
+}
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
new file mode 100644
index 0000000..2150b18
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An abstract {@link Executor} used to execute {@link Pipeline pipelines} on 
an existing (session) cluster.
+ *
+ * @param <ClusterID> the type of the id of the cluster.
+ * @param <ClientFactory> the type of the {@link ClusterClientFactory} used to 
create/retrieve a client to the target cluster.
+ */
+@Internal
+public class AbstractSessionClusterExecutor<ClusterID, ClientFactory extends 
ClusterClientFactory<ClusterID>> implements Executor {
+
+       private final ClientFactory clusterClientFactory;
+
+       public AbstractSessionClusterExecutor(@Nonnull final ClientFactory 
clusterClientFactory) {
+               this.clusterClientFactory = checkNotNull(clusterClientFactory);
+       }
+
+       @Override
+       public CompletableFuture<JobClient> execute(@Nonnull final Pipeline 
pipeline, @Nonnull final Configuration configuration) throws Exception {
+               final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, 
configuration);
+
+               try (final ClusterDescriptor<ClusterID> clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(configuration)) {
+                       final ClusterID clusterID = 
clusterClientFactory.getClusterId(configuration);
+                       checkState(clusterID != null);
+
+                       // the caller should take care of managing the 
life-cycle of the return JobClient.
+
+                       final ClusterClient<ClusterID> clusterClient = 
clusterDescriptor.retrieve(clusterID);
+                       return 
ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
+               }
+       }
+}
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
new file mode 100644
index 0000000..af540cb
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import javax.annotation.Nonnull;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class with method related to job execution.
+ */
+public class ExecutorUtils {
+
+       /**
+        * Creates the {@link JobGraph} corresponding to the provided {@link 
Pipeline}.
+        *
+        * @param pipeline the pipeline whose job graph we are computing
+        * @param configuration the configuration with the necessary 
information such as jars and
+        *                         classpaths to be included, the parallelism 
of the job and potential
+        *                         savepoint settings used to boostrap its 
state.
+        * @return the corresponding {@link JobGraph}.
+        */
+       public static JobGraph getJobGraph(@Nonnull final Pipeline pipeline, 
@Nonnull final Configuration configuration) {
+               checkNotNull(pipeline);
+               checkNotNull(configuration);
+
+               final ExecutionConfigAccessor executionConfigAccessor = 
ExecutionConfigAccessor.fromConfiguration(configuration);
+               final JobGraph jobGraph = FlinkPipelineTranslationUtil
+                               .getJobGraph(pipeline, configuration, 
executionConfigAccessor.getParallelism());
+
+               jobGraph.addJars(executionConfigAccessor.getJars());
+               jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
+               
jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
+
+               return jobGraph;
+       }
+}
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
index df4d2ba..f097323 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
@@ -19,77 +19,20 @@
 package org.apache.flink.client.deployment.executors;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.dag.Pipeline;
-import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.FlinkPipelineTranslationUtil;
-import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.deployment.AbstractSessionClusterExecutor;
 import org.apache.flink.client.deployment.StandaloneClientFactory;
-import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
 import org.apache.flink.client.deployment.StandaloneClusterId;
-import org.apache.flink.client.program.rest.RestClusterClient;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.Executor;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-
-import java.net.URL;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * The {@link Executor} to be used when executing a job on an already running 
cluster.
  */
 @Internal
-public class StandaloneSessionClusterExecutor implements Executor {
+public class StandaloneSessionClusterExecutor extends 
AbstractSessionClusterExecutor<StandaloneClusterId, StandaloneClientFactory> {
 
        public static final String NAME = "standalone-session-cluster";
 
-       private final StandaloneClientFactory clusterClientFactory;
-
        public StandaloneSessionClusterExecutor() {
-               this.clusterClientFactory = new StandaloneClientFactory();
-       }
-
-       @Override
-       public CompletableFuture<JobClient> execute(final Pipeline pipeline, 
final Configuration configuration) throws Exception {
-               final ExecutionConfigAccessor configAccessor = 
ExecutionConfigAccessor.fromConfiguration(configuration);
-
-               final List<URL> dependencies = configAccessor.getJars();
-               final List<URL> classpaths = configAccessor.getClasspaths();
-
-               final JobGraph jobGraph = getJobGraph(pipeline, configuration, 
classpaths, dependencies);
-
-               try (final StandaloneClusterDescriptor clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(configuration)) {
-                       final StandaloneClusterId clusterID = 
clusterClientFactory.getClusterId(configuration);
-                       checkState(clusterID != null);
-
-                       final RestClusterClient<StandaloneClusterId> 
clusterClient = clusterDescriptor.retrieve(clusterID);
-                       return 
ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
-               }
-       }
-
-       private JobGraph getJobGraph(
-                       final Pipeline pipeline,
-                       final Configuration configuration,
-                       final List<URL> classpaths,
-                       final List<URL> libraries) {
-
-               checkNotNull(pipeline);
-               checkNotNull(configuration);
-               checkNotNull(classpaths);
-               checkNotNull(libraries);
-
-               final ExecutionConfigAccessor executionConfigAccessor = 
ExecutionConfigAccessor.fromConfiguration(configuration);
-               final JobGraph jobGraph = FlinkPipelineTranslationUtil
-                               .getJobGraph(pipeline, configuration, 
executionConfigAccessor.getParallelism());
-
-               jobGraph.addJars(libraries);
-               jobGraph.setClasspaths(classpaths);
-               
jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
-
-               return jobGraph;
+               super(new StandaloneClientFactory());
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java 
b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
index 5be3193..b585be6 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
@@ -18,10 +18,11 @@
 
 package org.apache.flink.core.execution;
 
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.configuration.Configuration;
 
+import javax.annotation.Nonnull;
+
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -30,11 +31,15 @@ import java.util.concurrent.CompletableFuture;
 public interface Executor {
 
        /**
-        * Executes a {@link Pipeline} based on the provided configuration.
+        * Executes a {@link Pipeline} based on the provided configuration and 
returns a {@link JobClient} which allows to
+        * interact with the job being executed, e.g. cancel it or take a 
savepoint.
+        *
+        * <p><b>ATTENTION:</b> The caller is responsible for managing the 
lifecycle of the returned {@link JobClient}. This
+        * means that e.g. {@code close()} should be called explicitly at the 
call-site.
         *
         * @param pipeline the {@link Pipeline} to execute
         * @param configuration the {@link Configuration} with the required 
execution parameters
-        * @return the {@link JobExecutionResult} corresponding to the pipeline 
execution.
+        * @return a {@link CompletableFuture} with the {@link JobClient} 
corresponding to the pipeline.
         */
-       CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration 
configuration) throws Exception;
+       CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, 
@Nonnull final Configuration configuration) throws Exception;
 }
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
index ead6e9a..084b020 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
@@ -19,28 +19,11 @@
 package org.apache.flink.yarn.executors;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.dag.Pipeline;
-import org.apache.flink.client.FlinkPipelineTranslationUtil;
-import org.apache.flink.client.cli.ExecutionConfigAccessor;
-import org.apache.flink.client.deployment.ClusterSpecification;
-import org.apache.flink.client.deployment.executors.JobClientImpl;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.client.deployment.AbstractJobClusterExecutor;
 import org.apache.flink.core.execution.Executor;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.yarn.YarnClusterClientFactory;
-import org.apache.flink.yarn.YarnClusterDescriptor;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URL;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * The {@link Executor} to be used when executing a job in isolation.
@@ -48,56 +31,11 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * tear it down when the job is finished either successfully or due to an 
error.
  */
 @Internal
-public class YarnJobClusterExecutor implements Executor {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(YarnJobClusterExecutor.class);
+public class YarnJobClusterExecutor extends 
AbstractJobClusterExecutor<ApplicationId, YarnClusterClientFactory> {
 
        public static final String NAME = "yarn-job-cluster";
 
-       private final YarnClusterClientFactory clusterClientFactory;
-
        public YarnJobClusterExecutor() {
-               this.clusterClientFactory = new YarnClusterClientFactory();
-       }
-
-       @Override
-       public CompletableFuture<JobClient> execute(Pipeline pipeline, 
Configuration executionConfig) throws Exception {
-
-               try (final YarnClusterDescriptor clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(executionConfig)) {
-                       final ExecutionConfigAccessor configAccessor = 
ExecutionConfigAccessor.fromConfiguration(executionConfig);
-
-                       final List<URL> dependencies = configAccessor.getJars();
-                       final List<URL> classpaths = 
configAccessor.getClasspaths();
-
-                       final JobGraph jobGraph = getJobGraph(pipeline, 
executionConfig, classpaths, dependencies);
-
-                       final ClusterSpecification clusterSpecification = 
clusterClientFactory.getClusterSpecification(executionConfig);
-
-                       final ClusterClient<ApplicationId> client = 
clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, 
configAccessor.getDetachedMode());
-                       LOG.info("Job has been submitted with JobID " + 
jobGraph.getJobID());
-                       return CompletableFuture.completedFuture(new 
JobClientImpl<>(client, jobGraph.getJobID()));
-               }
-       }
-
-       private JobGraph getJobGraph(
-                       final Pipeline pipeline,
-                       final Configuration configuration,
-                       final List<URL> classpaths,
-                       final List<URL> libraries) {
-
-               checkNotNull(pipeline);
-               checkNotNull(configuration);
-               checkNotNull(classpaths);
-               checkNotNull(libraries);
-
-               final ExecutionConfigAccessor executionConfigAccessor = 
ExecutionConfigAccessor.fromConfiguration(configuration);
-               final JobGraph jobGraph = FlinkPipelineTranslationUtil
-                               .getJobGraph(pipeline, configuration, 
executionConfigAccessor.getParallelism());
-
-               jobGraph.addJars(libraries);
-               jobGraph.setClasspaths(classpaths);
-               
jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
-
-               return jobGraph;
+               super(new YarnClusterClientFactory());
        }
 }
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
index de4d148..873dce4 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
@@ -19,68 +19,21 @@
 package org.apache.flink.yarn.executors;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.dag.Pipeline;
-import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.FlinkPipelineTranslationUtil;
-import org.apache.flink.client.cli.ExecutionConfigAccessor;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.client.deployment.AbstractSessionClusterExecutor;
 import org.apache.flink.core.execution.Executor;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.yarn.YarnClusterClientFactory;
-import org.apache.flink.yarn.YarnClusterDescriptor;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * The {@link Executor} to be used when executing a job on an already running 
cluster.
  */
 @Internal
-public class YarnSessionClusterExecutor implements Executor {
+public class YarnSessionClusterExecutor extends 
AbstractSessionClusterExecutor<ApplicationId, YarnClusterClientFactory> {
 
        public static final String NAME = "yarn-session-cluster";
 
-       private final YarnClusterClientFactory clusterClientFactory;
-
        public YarnSessionClusterExecutor() {
-               this.clusterClientFactory = new YarnClusterClientFactory();
-       }
-
-       @Override
-       public CompletableFuture<JobClient> execute(final Pipeline pipeline, 
final Configuration configuration) throws Exception {
-               final JobGraph jobGraph = getJobGraph(pipeline, configuration);
-
-               try (final YarnClusterDescriptor clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(configuration)) {
-                       final ApplicationId clusterID = 
clusterClientFactory.getClusterId(configuration);
-                       checkState(clusterID != null);
-
-                       // TODO: 17.11.19 we cannot close the client here 
because we simply have a future of the client
-                       final ClusterClient<ApplicationId> clusterClient = 
clusterDescriptor.retrieve(clusterID);
-                       return 
ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
-               }
-       }
-
-       private JobGraph getJobGraph(
-                       final Pipeline pipeline,
-                       final Configuration configuration) {
-
-               checkNotNull(pipeline);
-               checkNotNull(configuration);
-
-               final ExecutionConfigAccessor executionConfigAccessor = 
ExecutionConfigAccessor.fromConfiguration(configuration);
-               final JobGraph jobGraph = FlinkPipelineTranslationUtil
-                               .getJobGraph(pipeline, configuration, 
executionConfigAccessor.getParallelism());
-
-               jobGraph.addJars(executionConfigAccessor.getJars());
-               jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
-               
jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
-
-               return jobGraph;
+               super(new YarnClusterClientFactory());
        }
 }

Reply via email to