This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch executors in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6dc466c0f9d362082524d027d288d315d5c9e15e Author: Kostas Kloudas <kklou...@gmail.com> AuthorDate: Mon Nov 11 14:36:05 2019 +0100 WIP --- .../java/org/apache/flink/client/ClientUtils.java | 5 ++++ .../executors/PerJobClusterExecutor.java | 31 +++------------------- .../executors/SessionClusterExecutor.java | 19 ++----------- 3 files changed, 10 insertions(+), 45 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java index b5537e2..96a02ea 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java @@ -29,8 +29,10 @@ import org.apache.flink.client.program.DetachedJobExecutionResult; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.client.program.ProgramMissingJobException; +import org.apache.flink.configuration.ConfigUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.core.execution.DefaultExecutorServiceLoader; import org.apache.flink.core.execution.ExecutorServiceLoader; import org.apache.flink.runtime.client.JobExecutionException; @@ -183,6 +185,9 @@ public enum ClientUtils { final AtomicReference<JobExecutionResult> jobExecutionResult = new AtomicReference<>(); + // TODO: 11.11.19 this should move to the appropriate place +// ConfigUtils.encodeStreamToConfig(configuration, PipelineOptions.JARS, program.getAllLibraries().stream(), URL::toString); + final ContextEnvironmentFactory factory = new ContextEnvironmentFactory( executorServiceLoader, configuration, diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PerJobClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PerJobClusterExecutor.java index 2d9dafb..ce6dc92 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PerJobClusterExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PerJobClusterExecutor.java @@ -30,10 +30,8 @@ import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.DetachedJobExecutionResult; -import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ExecutionOptions; -import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.core.execution.Executor; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.util.ShutdownHookUtil; @@ -41,10 +39,7 @@ import org.apache.flink.util.ShutdownHookUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; import java.net.URL; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -91,17 +86,7 @@ public class PerJobClusterExecutor<ClusterID> implements Executor { final List<URL> classpaths = configAccessor.getClasspaths(); final List<URL> jarFileUrls = configAccessor.getJarFilePaths(); - final List<File> extractedLibs = new ArrayList<>(); - for (URL jarFileUrl : jarFileUrls) { - extractedLibs.addAll(PackagedProgram.extractContainedLibraries(jarFileUrl)); - } - final boolean isPython = executionConfig.getBoolean(PipelineOptions.Internal.IS_PYTHON); - - final List<URL> libraries = jarFileUrls.isEmpty() - ? Collections.emptyList() - : PackagedProgram.getAllLibraries(jarFileUrls.get(0), extractedLibs, isPython); - - final JobGraph jobGraph = getJobGraph(pipeline, executionConfig, classpaths, libraries); + final JobGraph jobGraph = getJobGraph(pipeline, executionConfig, classpaths, jarFileUrls); final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executionConfig); @@ -125,19 +110,9 @@ public class PerJobClusterExecutor<ClusterID> implements Executor { final List<URL> classpaths = configAccessor.getClasspaths(); final List<URL> jarFileUrls = configAccessor.getJarFilePaths(); - final List<File> extractedLibs = new ArrayList<>(); - for (URL jarFileUrl : jarFileUrls) { - extractedLibs.addAll(PackagedProgram.extractContainedLibraries(jarFileUrl)); - } - final boolean isPython = executionConfig.getBoolean(PipelineOptions.Internal.IS_PYTHON); - - final List<URL> libraries = jarFileUrls.isEmpty() - ? Collections.emptyList() - : PackagedProgram.getAllLibraries(jarFileUrls.get(0), extractedLibs, isPython); - - final JobGraph jobGraph = getJobGraph(pipeline, executionConfig, classpaths, libraries); + final JobGraph jobGraph = getJobGraph(pipeline, executionConfig, classpaths, jarFileUrls); - final ClassLoader userClassLoader = ClientUtils.buildUserCodeClassLoader(libraries, classpaths, getClass().getClassLoader()); + final ClassLoader userClassLoader = ClientUtils.buildUserCodeClassLoader(jarFileUrls, classpaths, getClass().getClassLoader()); final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executionConfig); diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java index 84bbaa6..8b7bfb1 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java @@ -28,16 +28,11 @@ import org.apache.flink.client.deployment.ClusterClientServiceLoader; import org.apache.flink.client.deployment.ClusterDescriptor; import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.core.execution.Executor; import org.apache.flink.runtime.jobgraph.JobGraph; -import java.io.File; import java.net.URL; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -64,19 +59,9 @@ public class SessionClusterExecutor<ClusterID> implements Executor { final List<URL> classpaths = configAccessor.getClasspaths(); final List<URL> jarFileUrls = configAccessor.getJarFilePaths(); - final List<File> extractedLibs = new ArrayList<>(); - for (URL jarFileUrl : jarFileUrls) { - extractedLibs.addAll(PackagedProgram.extractContainedLibraries(jarFileUrl)); - } - final boolean isPython = executionConfig.getBoolean(PipelineOptions.Internal.IS_PYTHON); - - final List<URL> libraries = jarFileUrls.isEmpty() - ? Collections.emptyList() - : PackagedProgram.getAllLibraries(jarFileUrls.get(0), extractedLibs, isPython); - - final JobGraph jobGraph = getJobGraph(pipeline, executionConfig, classpaths, libraries); + final JobGraph jobGraph = getJobGraph(pipeline, executionConfig, classpaths, jarFileUrls); - final ClassLoader userClassLoader = ClientUtils.buildUserCodeClassLoader(libraries, classpaths, getClass().getClassLoader()); + final ClassLoader userClassLoader = ClientUtils.buildUserCodeClassLoader(jarFileUrls, classpaths, getClass().getClassLoader()); final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(executionConfig); try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(executionConfig)) {