This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 79f7a53e3aa71026c87855d7fdcc0568e085b797
Author: Kostas Kloudas <kklou...@gmail.com>
AuthorDate: Tue Nov 5 11:09:24 2019 +0100

    Session and per-job executors
---
 .../org/apache/flink/client/cli/CliFrontend.java   |   1 +
 .../executors/PerJobClusterExecutor.java           | 179 +++++++++++++++++++++
 .../executors/SessionClusterExecutor.java          |  98 +++++++++++
 .../flink/client/program/PackagedProgram.java      |   8 +-
 4 files changed, 284 insertions(+), 2 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index a1da66d..a6ef7f4 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -695,6 +695,7 @@ public class CliFrontend {
                                new PackagedProgram(jarFile, classpaths, 
programArgs) :
                                new PackagedProgram(jarFile, classpaths, 
entryPointClass, programArgs);
 
+               // TODO: 04.11.19 this maybe can go
                
program.setSavepointRestoreSettings(executionParameters.getSavepointRestoreSettings());
 
                return program;
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PerJobClusterExecutor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PerJobClusterExecutor.java
new file mode 100644
index 0000000..487c3fa
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PerJobClusterExecutor.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.executors;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.DetachedJobExecutionResult;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.ShutdownHookUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The {@link Executor} to be used when executing a job in isolation.
+ * This executor will start a cluster specifically for the job at hand and
+ * tear it down when the job is finished either successfully or due to an 
error.
+ */
+public class PerJobClusterExecutor<ClusterID> implements Executor {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(PerJobClusterExecutor.class);
+
+       private final ClusterClientServiceLoader clusterClientServiceLoader;
+
+       public PerJobClusterExecutor() {
+               this(new DefaultClusterClientServiceLoader());
+       }
+
+       public PerJobClusterExecutor(final ClusterClientServiceLoader 
clusterClientServiceLoader) {
+               this.clusterClientServiceLoader = 
checkNotNull(clusterClientServiceLoader);
+       }
+
+       @Override
+       public JobExecutionResult execute(Pipeline pipeline, Configuration 
executionConfig) throws Exception {
+               final ClusterClientFactory<ClusterID> clusterClientFactory = 
clusterClientServiceLoader.getClusterClientFactory(executionConfig);
+               final boolean attached = 
executionConfig.get(ExecutionOptions.ATTACHED);
+
+               try (final ClusterDescriptor<ClusterID> clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(executionConfig)) {
+                       return attached
+                                       ? executeAttached(clusterClientFactory, 
clusterDescriptor, pipeline, executionConfig)
+                                       : executeDetached(clusterClientFactory, 
clusterDescriptor, pipeline, executionConfig);
+               }
+       }
+
+       private JobExecutionResult executeDetached(
+                       final ClusterClientFactory<ClusterID> 
clusterClientFactory,
+                       final ClusterDescriptor<ClusterID> clusterDescriptor,
+                       final Pipeline pipeline,
+                       final Configuration executionConfig) throws Exception {
+
+               final ExecutionConfigAccessor configAccessor = 
ExecutionConfigAccessor.fromConfiguration(executionConfig);
+
+               final List<URL> classpaths = configAccessor.getClasspaths();
+               final URL jarFileUrl = configAccessor.getJarFilePath();
+
+               final List<File> extractedLibs = 
PackagedProgram.extractContainedLibraries(jarFileUrl);
+               final boolean isPython = 
executionConfig.getBoolean(PipelineOptions.Internal.IS_PYTHON);
+
+               final List<URL> libraries = jarFileUrl == null
+                               ? Collections.emptyList()
+                               : PackagedProgram.getAllLibraries(jarFileUrl, 
extractedLibs, isPython);
+
+               final JobGraph jobGraph = getJobGraph(pipeline, 
executionConfig, classpaths, libraries);
+
+               final ClusterSpecification clusterSpecification = 
clusterClientFactory.getClusterSpecification(executionConfig);
+
+               try (final ClusterClient<ClusterID> ignored = 
clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, true)) {
+                       LOG.info("Job has been submitted with JobID " + 
jobGraph.getJobID());
+               }
+               return new DetachedJobExecutionResult(jobGraph.getJobID());
+       }
+
+       private JobExecutionResult executeAttached(
+                       final ClusterClientFactory<ClusterID> 
clusterClientFactory,
+                       final ClusterDescriptor<ClusterID> clusterDescriptor,
+                       final Pipeline pipeline,
+                       final Configuration executionConfig) throws Exception {
+
+               ClusterClient<ClusterID> clusterClient = null;
+               Thread shutdownHook = null;
+
+               try {
+                       final ExecutionConfigAccessor configAccessor = 
ExecutionConfigAccessor.fromConfiguration(executionConfig);
+                       final ClusterSpecification clusterSpecification = 
clusterClientFactory.getClusterSpecification(executionConfig);
+
+                       clusterClient = 
clusterDescriptor.deploySessionCluster(clusterSpecification);
+                       shutdownHook = configAccessor.isShutdownOnAttachedExit()
+                                       ? 
ShutdownHookUtil.addShutdownHook(clusterClient::shutDownCluster, 
clusterClient.getClass().getSimpleName(), LOG)
+                                       : null;
+
+                       final List<URL> classpaths = 
configAccessor.getClasspaths();
+                       final URL jarFileUrl = configAccessor.getJarFilePath();
+
+                       final List<File> extractedLibs = 
PackagedProgram.extractContainedLibraries(jarFileUrl);
+                       final boolean isPython = 
executionConfig.getBoolean(PipelineOptions.Internal.IS_PYTHON);
+
+                       final List<URL> libraries = jarFileUrl == null
+                                       ? Collections.emptyList()
+                                       : 
PackagedProgram.getAllLibraries(jarFileUrl, extractedLibs, isPython);
+
+                       final ClassLoader userClassLoader = 
ClientUtils.buildUserCodeClassLoader(libraries, classpaths, 
getClass().getClassLoader());
+
+                       final JobGraph jobGraph = getJobGraph(pipeline, 
executionConfig, classpaths, libraries);
+
+                       checkState(!configAccessor.getDetachedMode());
+                       return 
ClientUtils.submitJobAndWaitForResult(clusterClient, jobGraph, 
userClassLoader).getJobExecutionResult();
+               } finally {
+                       if (clusterClient != null) {
+                               clusterClient.shutDownCluster();
+
+                               if (shutdownHook != null) {
+                                       
ShutdownHookUtil.removeShutdownHook(shutdownHook, 
clusterClient.getClass().getSimpleName(), LOG);
+                               }
+                               clusterClient.close();
+                       }
+               }
+       }
+
+       private JobGraph getJobGraph(
+                       final Pipeline pipeline,
+                       final Configuration configuration,
+                       final List<URL> classpaths,
+                       final List<URL> libraries) {
+
+               checkNotNull(pipeline);
+               checkNotNull(configuration);
+               checkNotNull(classpaths);
+               checkNotNull(libraries);
+
+               final ExecutionConfigAccessor executionConfigAccessor = 
ExecutionConfigAccessor.fromConfiguration(configuration);
+               final JobGraph jobGraph = 
FlinkPipelineTranslationUtil.getJobGraph(
+                               pipeline,
+                               configuration,
+                               executionConfigAccessor.getParallelism());
+               jobGraph.addJars(libraries);
+               jobGraph.setClasspaths(classpaths);
+               
jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
+
+               return jobGraph;
+       }
+}
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
new file mode 100644
index 0000000..5d8449e
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.executors;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The {@link Executor} to be used when executing a job on an already running 
cluster.
+ */
+public class SessionClusterExecutor<ClusterID> implements Executor {
+
+       private final ClusterClientServiceLoader clusterClientServiceLoader;
+
+       public SessionClusterExecutor() {
+               this(new DefaultClusterClientServiceLoader());
+       }
+
+       public SessionClusterExecutor(final ClusterClientServiceLoader 
clusterClientServiceLoader) {
+               this.clusterClientServiceLoader = 
checkNotNull(clusterClientServiceLoader);
+       }
+
+       @Override
+       public JobExecutionResult execute(final Pipeline pipeline, final 
Configuration executionConfig) throws Exception {
+               final ExecutionConfigAccessor configAccessor = 
ExecutionConfigAccessor.fromConfiguration(executionConfig);
+
+               final ClusterClientFactory<ClusterID> clusterClientFactory = 
clusterClientServiceLoader.getClusterClientFactory(executionConfig);
+
+               try (final ClusterDescriptor<ClusterID> clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(executionConfig)) {
+                       final ClusterID clusterID = 
clusterClientFactory.getClusterId(executionConfig);
+                       checkState(clusterID != null);
+
+                       try (final ClusterClient<ClusterID> clusterClient = 
clusterDescriptor.retrieve(clusterID)) {
+
+                               final JobGraph jobGraph = 
FlinkPipelineTranslationUtil.getJobGraph(
+                                               pipeline,
+                                               
clusterClient.getFlinkConfiguration(),
+                                               
configAccessor.getParallelism());
+
+                               final List<URL> classpaths = 
configAccessor.getClasspaths();
+                               final URL jarFileUrl = 
configAccessor.getJarFilePath();
+
+                               final List<File> extractedLibs = 
PackagedProgram.extractContainedLibraries(jarFileUrl);
+                               final boolean isPython = 
executionConfig.getBoolean(PipelineOptions.Internal.IS_PYTHON);
+                               final List<URL> libraries = jarFileUrl == null
+                                               ? Collections.emptyList()
+                                               : 
PackagedProgram.getAllLibraries(jarFileUrl, extractedLibs, isPython);
+
+                               final ClassLoader userClassLoader = 
ClientUtils.buildUserCodeClassLoader(libraries, classpaths, 
getClass().getClassLoader());
+
+                               jobGraph.addJars(libraries);
+                               jobGraph.setClasspaths(classpaths);
+                               
jobGraph.setSavepointRestoreSettings(configAccessor.getSavepointRestoreSettings());
+
+                               return configAccessor.getDetachedMode()
+                                               ? 
ClientUtils.submitJob(clusterClient, jobGraph)
+                                               : 
ClientUtils.submitJobAndWaitForResult(clusterClient, jobGraph, 
userClassLoader).getJobExecutionResult();
+                       }
+               }
+       }
+}
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index 2f765b1..cbd79b9 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -306,12 +306,16 @@ public class PackagedProgram {
         * Returns all provided libraries needed to run the program.
         */
        public List<URL> getAllLibraries() {
-               List<URL> libs = new 
ArrayList<URL>(this.extractedTempLibraries.size() + 1);
+               return getAllLibraries(this.jarFile, 
this.extractedTempLibraries, isPython);
+       }
+
+       public static List<URL> getAllLibraries(URL jarFile, List<File> 
extractedTempLibraries, boolean isPython) {
+               List<URL> libs = new 
ArrayList<URL>(extractedTempLibraries.size() + 1);
 
                if (jarFile != null) {
                        libs.add(jarFile);
                }
-               for (File tmpLib : this.extractedTempLibraries) {
+               for (File tmpLib : extractedTempLibraries) {
                        try {
                                
libs.add(tmpLib.getAbsoluteFile().toURI().toURL());
                        }

Reply via email to