This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6dc466c0f9d362082524d027d288d315d5c9e15e
Author: Kostas Kloudas <kklou...@gmail.com>
AuthorDate: Mon Nov 11 14:36:05 2019 +0100

    WIP
---
 .../java/org/apache/flink/client/ClientUtils.java  |  5 ++++
 .../executors/PerJobClusterExecutor.java           | 31 +++-------------------
 .../executors/SessionClusterExecutor.java          | 19 ++-----------
 3 files changed, 10 insertions(+), 45 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java 
b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index b5537e2..96a02ea 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -29,8 +29,10 @@ import 
org.apache.flink.client.program.DetachedJobExecutionResult;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.ProgramMissingJobException;
+import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.execution.ExecutorServiceLoader;
 import org.apache.flink.runtime.client.JobExecutionException;
@@ -183,6 +185,9 @@ public enum ClientUtils {
 
                        final AtomicReference<JobExecutionResult> 
jobExecutionResult = new AtomicReference<>();
 
+                       // TODO: 11.11.19 this should move to the appropriate 
place
+//                     ConfigUtils.encodeStreamToConfig(configuration, 
PipelineOptions.JARS, program.getAllLibraries().stream(), URL::toString);
+
                        final ContextEnvironmentFactory factory = new 
ContextEnvironmentFactory(
                                        executorServiceLoader,
                                        configuration,
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PerJobClusterExecutor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PerJobClusterExecutor.java
index 2d9dafb..ce6dc92 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PerJobClusterExecutor.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PerJobClusterExecutor.java
@@ -30,10 +30,8 @@ import 
org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.DetachedJobExecutionResult;
-import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
-import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.util.ShutdownHookUtil;
@@ -41,10 +39,7 @@ import org.apache.flink.util.ShutdownHookUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -91,17 +86,7 @@ public class PerJobClusterExecutor<ClusterID> implements 
Executor {
                final List<URL> classpaths = configAccessor.getClasspaths();
                final List<URL> jarFileUrls = configAccessor.getJarFilePaths();
 
-               final List<File> extractedLibs = new ArrayList<>();
-               for (URL jarFileUrl : jarFileUrls) {
-                       
extractedLibs.addAll(PackagedProgram.extractContainedLibraries(jarFileUrl));
-               }
-               final boolean isPython = 
executionConfig.getBoolean(PipelineOptions.Internal.IS_PYTHON);
-
-               final List<URL> libraries = jarFileUrls.isEmpty()
-                               ? Collections.emptyList()
-                               : 
PackagedProgram.getAllLibraries(jarFileUrls.get(0), extractedLibs, isPython);
-
-               final JobGraph jobGraph = getJobGraph(pipeline, 
executionConfig, classpaths, libraries);
+               final JobGraph jobGraph = getJobGraph(pipeline, 
executionConfig, classpaths, jarFileUrls);
 
                final ClusterSpecification clusterSpecification = 
clusterClientFactory.getClusterSpecification(executionConfig);
 
@@ -125,19 +110,9 @@ public class PerJobClusterExecutor<ClusterID> implements 
Executor {
                        final List<URL> classpaths = 
configAccessor.getClasspaths();
                        final List<URL> jarFileUrls = 
configAccessor.getJarFilePaths();
 
-                       final List<File> extractedLibs = new ArrayList<>();
-                       for (URL jarFileUrl : jarFileUrls) {
-                               
extractedLibs.addAll(PackagedProgram.extractContainedLibraries(jarFileUrl));
-                       }
-                       final boolean isPython = 
executionConfig.getBoolean(PipelineOptions.Internal.IS_PYTHON);
-
-                       final List<URL> libraries = jarFileUrls.isEmpty()
-                                       ? Collections.emptyList()
-                                       : 
PackagedProgram.getAllLibraries(jarFileUrls.get(0), extractedLibs, isPython);
-
-                       final JobGraph jobGraph = getJobGraph(pipeline, 
executionConfig, classpaths, libraries);
+                       final JobGraph jobGraph = getJobGraph(pipeline, 
executionConfig, classpaths, jarFileUrls);
 
-                       final ClassLoader userClassLoader = 
ClientUtils.buildUserCodeClassLoader(libraries, classpaths, 
getClass().getClassLoader());
+                       final ClassLoader userClassLoader = 
ClientUtils.buildUserCodeClassLoader(jarFileUrls, classpaths, 
getClass().getClassLoader());
 
                        final ClusterSpecification clusterSpecification = 
clusterClientFactory.getClusterSpecification(executionConfig);
 
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
index 84bbaa6..8b7bfb1 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
@@ -28,16 +28,11 @@ import 
org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 
-import java.io.File;
 import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -64,19 +59,9 @@ public class SessionClusterExecutor<ClusterID> implements 
Executor {
                final List<URL> classpaths = configAccessor.getClasspaths();
                final List<URL> jarFileUrls = configAccessor.getJarFilePaths();
 
-               final List<File> extractedLibs = new ArrayList<>();
-               for (URL jarFileUrl : jarFileUrls) {
-                       
extractedLibs.addAll(PackagedProgram.extractContainedLibraries(jarFileUrl));
-               }
-               final boolean isPython = 
executionConfig.getBoolean(PipelineOptions.Internal.IS_PYTHON);
-
-               final List<URL> libraries = jarFileUrls.isEmpty()
-                               ? Collections.emptyList()
-                               : 
PackagedProgram.getAllLibraries(jarFileUrls.get(0), extractedLibs, isPython);
-
-               final JobGraph jobGraph = getJobGraph(pipeline, 
executionConfig, classpaths, libraries);
+               final JobGraph jobGraph = getJobGraph(pipeline, 
executionConfig, classpaths, jarFileUrls);
 
-               final ClassLoader userClassLoader = 
ClientUtils.buildUserCodeClassLoader(libraries, classpaths, 
getClass().getClassLoader());
+               final ClassLoader userClassLoader = 
ClientUtils.buildUserCodeClassLoader(jarFileUrls, classpaths, 
getClass().getClassLoader());
 
                final ClusterClientFactory<ClusterID> clusterClientFactory = 
clusterClientServiceLoader.getClusterClientFactory(executionConfig);
                try (final ClusterDescriptor<ClusterID> clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(executionConfig)) {

Reply via email to