This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit df355d9c4a282ff93f8fbddacfc1c7df0f774ef3
Author: Kostas Kloudas <kklou...@gmail.com>
AuthorDate: Sun Nov 17 13:49:02 2019 +0100

    Wired verything together
---
 .../java/org/apache/flink/client/ClientUtils.java  |   1 -
 .../org/apache/flink/client/cli/CliFrontend.java   | 102 ++-------------------
 .../flink/client/cli/CliFrontendRunTest.java       |   3 +-
 .../execution/DefaultExecutorServiceLoader.java    |   2 +-
 4 files changed, 10 insertions(+), 98 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java 
b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 5824832..f971982 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -30,7 +30,6 @@ import 
org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.ProgramMissingJobException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.execution.ExecutorServiceLoader;
 import org.apache.flink.core.execution.JobClient;
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 258708a..8dfe306 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -30,7 +30,6 @@ import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.deployment.ClusterClientFactory;
 import org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.ClusterDescriptor;
-import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
@@ -45,11 +44,11 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.security.SecurityConfiguration;
@@ -57,7 +56,6 @@ import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.ShutdownHookUtil;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
@@ -215,7 +213,7 @@ public class CliFrontend {
                                program.getJobJarAndDependencies());
 
                try {
-                       runProgram(effectiveConfiguration, program);
+                       execute(effectiveConfiguration, program);
                } finally {
                        program.deleteExtractedLibraries();
                }
@@ -236,92 +234,6 @@ public class CliFrontend {
                return 
executionParameters.applyToConfiguration(effectiveConfiguration);
        }
 
-       private <ClusterID> void runProgram(
-                       Configuration configuration,
-                       PackagedProgram program) throws 
ProgramInvocationException, FlinkException {
-
-               final ClusterClientFactory<ClusterID> clusterClientFactory = 
clusterClientServiceLoader.getClusterClientFactory(configuration);
-               checkNotNull(clusterClientFactory);
-
-               final ClusterDescriptor<ClusterID> clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(configuration);
-
-               try {
-                       final ClusterID clusterId = 
clusterClientFactory.getClusterId(configuration);
-                       final ExecutionConfigAccessor executionParameters = 
ExecutionConfigAccessor.fromConfiguration(configuration);
-                       final ClusterClient<ClusterID> client;
-
-                       // directly deploy the job if the cluster is started in 
job mode and detached
-                       if (clusterId == null && 
executionParameters.getDetachedMode()) {
-                               int parallelism = 
executionParameters.getParallelism() == -1 ? defaultParallelism : 
executionParameters.getParallelism();
-
-                               final JobGraph jobGraph = 
PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
-
-                               final ClusterSpecification clusterSpecification 
= clusterClientFactory.getClusterSpecification(configuration);
-                               client = clusterDescriptor.deployJobCluster(
-                                       clusterSpecification,
-                                       jobGraph,
-                                       executionParameters.getDetachedMode());
-
-                               logAndSysout("Job has been submitted with JobID 
" + jobGraph.getJobID());
-
-                               try {
-                                       client.close();
-                               } catch (Exception e) {
-                                       LOG.info("Could not properly shut down 
the client.", e);
-                               }
-                       } else {
-                               final Thread shutdownHook;
-                               if (clusterId != null) {
-                                       client = 
clusterDescriptor.retrieve(clusterId);
-                                       shutdownHook = null;
-                               } else {
-                                       // also in job mode we have to deploy a 
session cluster because the job
-                                       // might consist of multiple parts 
(e.g. when using collect)
-                                       final ClusterSpecification 
clusterSpecification = 
clusterClientFactory.getClusterSpecification(configuration);
-                                       client = 
clusterDescriptor.deploySessionCluster(clusterSpecification);
-                                       // if not running in detached mode, add 
a shutdown hook to shut down cluster if client exits
-                                       // there's a race-condition here if cli 
is killed before shutdown hook is installed
-                                       if 
(!executionParameters.getDetachedMode() && 
executionParameters.isShutdownOnAttachedExit()) {
-                                               shutdownHook = 
ShutdownHookUtil.addShutdownHook(client::shutDownCluster, 
client.getClass().getSimpleName(), LOG);
-                                       } else {
-                                               shutdownHook = null;
-                                       }
-                               }
-
-                               try {
-                                       int userParallelism = 
executionParameters.getParallelism();
-                                       LOG.debug("User parallelism is set to 
{}", userParallelism);
-
-                                       executeProgram(configuration, program);
-                               } finally {
-                                       if (clusterId == null && 
!executionParameters.getDetachedMode()) {
-                                               // terminate the cluster only 
if we have started it before and if it's not detached
-                                               try {
-                                                       
client.shutDownCluster();
-                                               } catch (final Exception e) {
-                                                       LOG.info("Could not 
properly terminate the Flink cluster.", e);
-                                               }
-                                               if (shutdownHook != null) {
-                                                       // we do not need the 
hook anymore as we have just tried to shutdown the cluster.
-                                                       
ShutdownHookUtil.removeShutdownHook(shutdownHook, 
client.getClass().getSimpleName(), LOG);
-                                               }
-                                       }
-                                       try {
-                                               client.close();
-                                       } catch (Exception e) {
-                                               LOG.info("Could not properly 
shut down the client.", e);
-                                       }
-                               }
-                       }
-               } finally {
-                       try {
-                               clusterDescriptor.close();
-                       } catch (Exception e) {
-                               LOG.info("Could not properly close the cluster 
descriptor.", e);
-                       }
-               }
-       }
-
        /**
         * Executes the info action.
         *
@@ -751,12 +663,14 @@ public class CliFrontend {
        //  Interaction with programs and JobManager
        // 
--------------------------------------------------------------------------------------------
 
-       protected void executeProgram(
-                       Configuration configuration,
-                       PackagedProgram program) throws 
ProgramMissingJobException, ProgramInvocationException {
+       protected void execute(final Configuration configuration, final 
PackagedProgram program) throws ProgramInvocationException, FlinkException {
+               checkNotNull(configuration);
+               checkNotNull(program);
+
                logAndSysout("Starting execution of program");
 
-               JobSubmissionResult result = ClientUtils.executeProgram(new 
DefaultExecutorServiceLoader(), configuration, program);
+               final ExecutorServiceLoader executorServiceLoader = new 
DefaultExecutorServiceLoader();
+               final JobSubmissionResult result = 
ClientUtils.executeProgram(executorServiceLoader, configuration, program);
 
                if (result.isJobExecutionResult()) {
                        logAndSysout("Program execution finished");
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
index a0d551b..50232ba 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.client.cli;
 
 import org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -198,7 +197,7 @@ public class CliFrontendRunTest extends CliFrontendTestBase 
{
                }
 
                @Override
-               protected void executeProgram(Configuration configuration, 
PackagedProgram program, ClusterClient client) {
+               protected void execute(final Configuration configuration, final 
PackagedProgram program) {
                        final ExecutionConfigAccessor executionConfigAccessor = 
ExecutionConfigAccessor.fromConfiguration(configuration);
                        assertEquals(isDetached, 
executionConfigAccessor.getDetachedMode());
                        assertEquals(expectedParallelism, 
executionConfigAccessor.getParallelism());
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
 
b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
index b627b71..297b17e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
@@ -32,9 +32,9 @@ import java.util.stream.Collectors;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * todo make it singleton
  * The default implementation of the {@link ExecutorServiceLoader}. This 
implementation uses
  * Java service discovery to find the available {@link ExecutorFactory 
executor factories}.
+ * MAKE IT A SINGLETON.
  */
 public class DefaultExecutorServiceLoader implements ExecutorServiceLoader {
 

Reply via email to