This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch executors in repository https://gitbox.apache.org/repos/asf/flink.git
commit 79f7a53e3aa71026c87855d7fdcc0568e085b797 Author: Kostas Kloudas <kklou...@gmail.com> AuthorDate: Tue Nov 5 11:09:24 2019 +0100 Session and per-job executors --- .../org/apache/flink/client/cli/CliFrontend.java | 1 + .../executors/PerJobClusterExecutor.java | 179 +++++++++++++++++++++ .../executors/SessionClusterExecutor.java | 98 +++++++++++ .../flink/client/program/PackagedProgram.java | 8 +- 4 files changed, 284 insertions(+), 2 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index a1da66d..a6ef7f4 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -695,6 +695,7 @@ public class CliFrontend { new PackagedProgram(jarFile, classpaths, programArgs) : new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs); + // TODO: 04.11.19 this maybe can go program.setSavepointRestoreSettings(executionParameters.getSavepointRestoreSettings()); return program; diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PerJobClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PerJobClusterExecutor.java new file mode 100644 index 0000000..487c3fa --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PerJobClusterExecutor.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.executors; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.dag.Pipeline; +import org.apache.flink.client.ClientUtils; +import org.apache.flink.client.FlinkPipelineTranslationUtil; +import org.apache.flink.client.cli.ExecutionConfigAccessor; +import org.apache.flink.client.deployment.ClusterClientFactory; +import org.apache.flink.client.deployment.ClusterClientServiceLoader; +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.DetachedJobExecutionResult; +import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.core.execution.Executor; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.util.ShutdownHookUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.net.URL; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The {@link Executor} to be used when executing a job in isolation. + * This executor will start a cluster specifically for the job at hand and + * tear it down when the job is finished either successfully or due to an error. + */ +public class PerJobClusterExecutor<ClusterID> implements Executor { + + private static final Logger LOG = LoggerFactory.getLogger(PerJobClusterExecutor.class); + + private final ClusterClientServiceLoader clusterClientServiceLoader; + + public PerJobClusterExecutor() { + this(new DefaultClusterClientServiceLoader()); + } + + public PerJobClusterExecutor(final ClusterClientServiceLoader clusterClientServiceLoader) { + this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader); + } + + @Override + public JobExecutionResult execute(Pipeline pipeline, Configuration executionConfig) throws Exception { + final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(executionConfig); + final boolean attached = executionConfig.get(ExecutionOptions.ATTACHED); + + try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(executionConfig)) { + return attached + ? executeAttached(clusterClientFactory, clusterDescriptor, pipeline, executionConfig) + : executeDetached(clusterClientFactory, clusterDescriptor, pipeline, executionConfig); + } + } + + private JobExecutionResult executeDetached( + final ClusterClientFactory<ClusterID> clusterClientFactory, + final ClusterDescriptor<ClusterID> clusterDescriptor, + final Pipeline pipeline, + final Configuration executionConfig) throws Exception { + + final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(executionConfig); + + final List<URL> classpaths = configAccessor.getClasspaths(); + final URL jarFileUrl = configAccessor.getJarFilePath(); + + final List<File> extractedLibs = PackagedProgram.extractContainedLibraries(jarFileUrl); + final boolean isPython = executionConfig.getBoolean(PipelineOptions.Internal.IS_PYTHON); + + final List<URL> libraries = jarFileUrl == null + ? Collections.emptyList() + : PackagedProgram.getAllLibraries(jarFileUrl, extractedLibs, isPython); + + final JobGraph jobGraph = getJobGraph(pipeline, executionConfig, classpaths, libraries); + + final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executionConfig); + + try (final ClusterClient<ClusterID> ignored = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, true)) { + LOG.info("Job has been submitted with JobID " + jobGraph.getJobID()); + } + return new DetachedJobExecutionResult(jobGraph.getJobID()); + } + + private JobExecutionResult executeAttached( + final ClusterClientFactory<ClusterID> clusterClientFactory, + final ClusterDescriptor<ClusterID> clusterDescriptor, + final Pipeline pipeline, + final Configuration executionConfig) throws Exception { + + ClusterClient<ClusterID> clusterClient = null; + Thread shutdownHook = null; + + try { + final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(executionConfig); + final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executionConfig); + + clusterClient = clusterDescriptor.deploySessionCluster(clusterSpecification); + shutdownHook = configAccessor.isShutdownOnAttachedExit() + ? ShutdownHookUtil.addShutdownHook(clusterClient::shutDownCluster, clusterClient.getClass().getSimpleName(), LOG) + : null; + + final List<URL> classpaths = configAccessor.getClasspaths(); + final URL jarFileUrl = configAccessor.getJarFilePath(); + + final List<File> extractedLibs = PackagedProgram.extractContainedLibraries(jarFileUrl); + final boolean isPython = executionConfig.getBoolean(PipelineOptions.Internal.IS_PYTHON); + + final List<URL> libraries = jarFileUrl == null + ? Collections.emptyList() + : PackagedProgram.getAllLibraries(jarFileUrl, extractedLibs, isPython); + + final ClassLoader userClassLoader = ClientUtils.buildUserCodeClassLoader(libraries, classpaths, getClass().getClassLoader()); + + final JobGraph jobGraph = getJobGraph(pipeline, executionConfig, classpaths, libraries); + + checkState(!configAccessor.getDetachedMode()); + return ClientUtils.submitJobAndWaitForResult(clusterClient, jobGraph, userClassLoader).getJobExecutionResult(); + } finally { + if (clusterClient != null) { + clusterClient.shutDownCluster(); + + if (shutdownHook != null) { + ShutdownHookUtil.removeShutdownHook(shutdownHook, clusterClient.getClass().getSimpleName(), LOG); + } + clusterClient.close(); + } + } + } + + private JobGraph getJobGraph( + final Pipeline pipeline, + final Configuration configuration, + final List<URL> classpaths, + final List<URL> libraries) { + + checkNotNull(pipeline); + checkNotNull(configuration); + checkNotNull(classpaths); + checkNotNull(libraries); + + final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration); + final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph( + pipeline, + configuration, + executionConfigAccessor.getParallelism()); + jobGraph.addJars(libraries); + jobGraph.setClasspaths(classpaths); + jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings()); + + return jobGraph; + } +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java new file mode 100644 index 0000000..5d8449e --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.executors; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.dag.Pipeline; +import org.apache.flink.client.ClientUtils; +import org.apache.flink.client.FlinkPipelineTranslationUtil; +import org.apache.flink.client.cli.ExecutionConfigAccessor; +import org.apache.flink.client.deployment.ClusterClientFactory; +import org.apache.flink.client.deployment.ClusterClientServiceLoader; +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.core.execution.Executor; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import java.io.File; +import java.net.URL; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The {@link Executor} to be used when executing a job on an already running cluster. + */ +public class SessionClusterExecutor<ClusterID> implements Executor { + + private final ClusterClientServiceLoader clusterClientServiceLoader; + + public SessionClusterExecutor() { + this(new DefaultClusterClientServiceLoader()); + } + + public SessionClusterExecutor(final ClusterClientServiceLoader clusterClientServiceLoader) { + this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader); + } + + @Override + public JobExecutionResult execute(final Pipeline pipeline, final Configuration executionConfig) throws Exception { + final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(executionConfig); + + final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(executionConfig); + + try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(executionConfig)) { + final ClusterID clusterID = clusterClientFactory.getClusterId(executionConfig); + checkState(clusterID != null); + + try (final ClusterClient<ClusterID> clusterClient = clusterDescriptor.retrieve(clusterID)) { + + final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph( + pipeline, + clusterClient.getFlinkConfiguration(), + configAccessor.getParallelism()); + + final List<URL> classpaths = configAccessor.getClasspaths(); + final URL jarFileUrl = configAccessor.getJarFilePath(); + + final List<File> extractedLibs = PackagedProgram.extractContainedLibraries(jarFileUrl); + final boolean isPython = executionConfig.getBoolean(PipelineOptions.Internal.IS_PYTHON); + final List<URL> libraries = jarFileUrl == null + ? Collections.emptyList() + : PackagedProgram.getAllLibraries(jarFileUrl, extractedLibs, isPython); + + final ClassLoader userClassLoader = ClientUtils.buildUserCodeClassLoader(libraries, classpaths, getClass().getClassLoader()); + + jobGraph.addJars(libraries); + jobGraph.setClasspaths(classpaths); + jobGraph.setSavepointRestoreSettings(configAccessor.getSavepointRestoreSettings()); + + return configAccessor.getDetachedMode() + ? ClientUtils.submitJob(clusterClient, jobGraph) + : ClientUtils.submitJobAndWaitForResult(clusterClient, jobGraph, userClassLoader).getJobExecutionResult(); + } + } + } +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java index 2f765b1..cbd79b9 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java @@ -306,12 +306,16 @@ public class PackagedProgram { * Returns all provided libraries needed to run the program. */ public List<URL> getAllLibraries() { - List<URL> libs = new ArrayList<URL>(this.extractedTempLibraries.size() + 1); + return getAllLibraries(this.jarFile, this.extractedTempLibraries, isPython); + } + + public static List<URL> getAllLibraries(URL jarFile, List<File> extractedTempLibraries, boolean isPython) { + List<URL> libs = new ArrayList<URL>(extractedTempLibraries.size() + 1); if (jarFile != null) { libs.add(jarFile); } - for (File tmpLib : this.extractedTempLibraries) { + for (File tmpLib : extractedTempLibraries) { try { libs.add(tmpLib.getAbsoluteFile().toURI().toURL()); }