This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch executor-impl in repository https://gitbox.apache.org/repos/asf/flink.git
commit df355d9c4a282ff93f8fbddacfc1c7df0f774ef3 Author: Kostas Kloudas <kklou...@gmail.com> AuthorDate: Sun Nov 17 13:49:02 2019 +0100 Wired verything together --- .../java/org/apache/flink/client/ClientUtils.java | 1 - .../org/apache/flink/client/cli/CliFrontend.java | 102 ++------------------- .../flink/client/cli/CliFrontendRunTest.java | 3 +- .../execution/DefaultExecutorServiceLoader.java | 2 +- 4 files changed, 10 insertions(+), 98 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java index 5824832..f971982 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java @@ -30,7 +30,6 @@ import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.client.program.ProgramMissingJobException; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; -import org.apache.flink.core.execution.DefaultExecutorServiceLoader; import org.apache.flink.core.execution.ExecutorServiceLoader; import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index 258708a..8dfe306 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -30,7 +30,6 @@ import org.apache.flink.client.FlinkPipelineTranslationUtil; import org.apache.flink.client.deployment.ClusterClientFactory; import org.apache.flink.client.deployment.ClusterClientServiceLoader; import org.apache.flink.client.deployment.ClusterDescriptor; -import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.PackagedProgram; @@ -45,11 +44,11 @@ import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.core.execution.DefaultExecutorServiceLoader; +import org.apache.flink.core.execution.ExecutorServiceLoader; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.plugin.PluginUtils; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.client.JobStatusMessage; -import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.security.SecurityConfiguration; @@ -57,7 +56,6 @@ import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; -import org.apache.flink.util.ShutdownHookUtil; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; @@ -215,7 +213,7 @@ public class CliFrontend { program.getJobJarAndDependencies()); try { - runProgram(effectiveConfiguration, program); + execute(effectiveConfiguration, program); } finally { program.deleteExtractedLibraries(); } @@ -236,92 +234,6 @@ public class CliFrontend { return executionParameters.applyToConfiguration(effectiveConfiguration); } - private <ClusterID> void runProgram( - Configuration configuration, - PackagedProgram program) throws ProgramInvocationException, FlinkException { - - final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration); - checkNotNull(clusterClientFactory); - - final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration); - - try { - final ClusterID clusterId = clusterClientFactory.getClusterId(configuration); - final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromConfiguration(configuration); - final ClusterClient<ClusterID> client; - - // directly deploy the job if the cluster is started in job mode and detached - if (clusterId == null && executionParameters.getDetachedMode()) { - int parallelism = executionParameters.getParallelism() == -1 ? defaultParallelism : executionParameters.getParallelism(); - - final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism); - - final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration); - client = clusterDescriptor.deployJobCluster( - clusterSpecification, - jobGraph, - executionParameters.getDetachedMode()); - - logAndSysout("Job has been submitted with JobID " + jobGraph.getJobID()); - - try { - client.close(); - } catch (Exception e) { - LOG.info("Could not properly shut down the client.", e); - } - } else { - final Thread shutdownHook; - if (clusterId != null) { - client = clusterDescriptor.retrieve(clusterId); - shutdownHook = null; - } else { - // also in job mode we have to deploy a session cluster because the job - // might consist of multiple parts (e.g. when using collect) - final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration); - client = clusterDescriptor.deploySessionCluster(clusterSpecification); - // if not running in detached mode, add a shutdown hook to shut down cluster if client exits - // there's a race-condition here if cli is killed before shutdown hook is installed - if (!executionParameters.getDetachedMode() && executionParameters.isShutdownOnAttachedExit()) { - shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG); - } else { - shutdownHook = null; - } - } - - try { - int userParallelism = executionParameters.getParallelism(); - LOG.debug("User parallelism is set to {}", userParallelism); - - executeProgram(configuration, program); - } finally { - if (clusterId == null && !executionParameters.getDetachedMode()) { - // terminate the cluster only if we have started it before and if it's not detached - try { - client.shutDownCluster(); - } catch (final Exception e) { - LOG.info("Could not properly terminate the Flink cluster.", e); - } - if (shutdownHook != null) { - // we do not need the hook anymore as we have just tried to shutdown the cluster. - ShutdownHookUtil.removeShutdownHook(shutdownHook, client.getClass().getSimpleName(), LOG); - } - } - try { - client.close(); - } catch (Exception e) { - LOG.info("Could not properly shut down the client.", e); - } - } - } - } finally { - try { - clusterDescriptor.close(); - } catch (Exception e) { - LOG.info("Could not properly close the cluster descriptor.", e); - } - } - } - /** * Executes the info action. * @@ -751,12 +663,14 @@ public class CliFrontend { // Interaction with programs and JobManager // -------------------------------------------------------------------------------------------- - protected void executeProgram( - Configuration configuration, - PackagedProgram program) throws ProgramMissingJobException, ProgramInvocationException { + protected void execute(final Configuration configuration, final PackagedProgram program) throws ProgramInvocationException, FlinkException { + checkNotNull(configuration); + checkNotNull(program); + logAndSysout("Starting execution of program"); - JobSubmissionResult result = ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), configuration, program); + final ExecutorServiceLoader executorServiceLoader = new DefaultExecutorServiceLoader(); + final JobSubmissionResult result = ClientUtils.executeProgram(executorServiceLoader, configuration, program); if (result.isJobExecutionResult()) { logAndSysout("Program execution finished"); diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java index a0d551b..50232ba 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java @@ -20,7 +20,6 @@ package org.apache.flink.client.cli; import org.apache.flink.client.deployment.ClusterClientServiceLoader; import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; -import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; @@ -198,7 +197,7 @@ public class CliFrontendRunTest extends CliFrontendTestBase { } @Override - protected void executeProgram(Configuration configuration, PackagedProgram program, ClusterClient client) { + protected void execute(final Configuration configuration, final PackagedProgram program) { final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration); assertEquals(isDetached, executionConfigAccessor.getDetachedMode()); assertEquals(expectedParallelism, executionConfigAccessor.getParallelism()); diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java index b627b71..297b17e 100644 --- a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java +++ b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java @@ -32,9 +32,9 @@ import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * todo make it singleton * The default implementation of the {@link ExecutorServiceLoader}. This implementation uses * Java service discovery to find the available {@link ExecutorFactory executor factories}. + * MAKE IT A SINGLETON. */ public class DefaultExecutorServiceLoader implements ExecutorServiceLoader {