This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3356a23251d2f980309639cbebd6f6f25144a31c
Author: Kostas Kloudas <kklou...@gmail.com>
AuthorDate: Mon Nov 4 15:28:15 2019 +0100

    [FLINK-XXXXX] Add the addJars(List<URL>) in JobGraph
---
 .../main/java/org/apache/flink/client/ClientUtils.java    | 15 ---------------
 .../main/java/org/apache/flink/client/RemoteExecutor.java |  2 +-
 .../apache/flink/client/program/ContextEnvironment.java   |  2 +-
 .../apache/flink/client/program/PackagedProgramUtils.java |  3 +--
 .../java/org/apache/flink/client/program/ClientTest.java  |  2 +-
 .../java/org/apache/flink/runtime/jobgraph/JobGraph.java  | 11 +++++++++++
 .../api/environment/StreamContextEnvironment.java         |  2 +-
 .../table/client/gateway/local/ExecutionContext.java      |  3 +--
 8 files changed, 17 insertions(+), 23 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java 
b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index d95da8f..d3c8928 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -36,7 +36,6 @@ import org.apache.flink.client.program.PackagedProgramUtils;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.ProgramMissingJobException;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.client.JobExecutionException;
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -68,20 +67,6 @@ public enum ClientUtils {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(ClientUtils.class);
 
-       /**
-        * Adds the given jar files to the {@link JobGraph} via {@link 
JobGraph#addJar}. This will
-        * throw an exception if a jar URL is not valid.
-        */
-       public static void addJarFiles(JobGraph jobGraph, List<URL> 
jarFilesToAttach) {
-               for (URL jar : jarFilesToAttach) {
-                       try {
-                               jobGraph.addJar(new Path(jar.toURI()));
-                       } catch (URISyntaxException e) {
-                               throw new RuntimeException("URL is invalid. 
This should not happen.", e);
-                       }
-               }
-       }
-
        public static void checkJarFile(URL jar) throws IOException {
                File jarFile;
                try {
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java 
b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index b601367..71fde64 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -108,7 +108,7 @@ public class RemoteExecutor extends PlanExecutor {
                                clientConfiguration,
                                getDefaultParallelism());
 
-               ClientUtils.addJarFiles(jobGraph, jarFiles);
+               jobGraph.addJars(jarFiles);
                jobGraph.setClasspaths(globalClasspaths);
 
                ClassLoader userCodeClassLoader = 
ClientUtils.buildUserCodeClassLoader(
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 8f9048c..08a02af 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -84,7 +84,7 @@ public class ContextEnvironment extends ExecutionEnvironment {
                                client.getFlinkConfiguration(),
                                getParallelism());
 
-               ClientUtils.addJarFiles(jobGraph, this.jarFilesToAttach);
+               jobGraph.addJars(this.jarFilesToAttach);
                jobGraph.setClasspaths(this.classpathsToAttach);
 
                if (detached) {
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
index fa9f8b0..23840aa 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
@@ -20,7 +20,6 @@ package org.apache.flink.client.program;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.dag.Pipeline;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.optimizer.CompilerException;
@@ -61,7 +60,7 @@ public class PackagedProgramUtils {
                if (jobID != null) {
                        jobGraph.setJobID(jobID);
                }
-               ClientUtils.addJarFiles(jobGraph, 
packagedProgram.getAllLibraries());
+               jobGraph.addJars(packagedProgram.getAllLibraries());
                jobGraph.setClasspaths(packagedProgram.getClasspaths());
                
jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());
 
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 6c6ec1d..74e316a 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -160,7 +160,7 @@ public class ClientTest extends TestLogger {
                                new Configuration(),
                                1);
 
-               ClientUtils.addJarFiles(jobGraph, Collections.emptyList());
+               jobGraph.addJars(Collections.emptyList());
                jobGraph.setClasspaths(Collections.emptyList());
 
                JobSubmissionResult result = 
ClientUtils.submitJob(clusterClient, jobGraph);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index f63a518..b404966 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -31,6 +31,7 @@ import org.apache.flink.util.SerializedValue;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -473,6 +474,16 @@ public class JobGraph implements Serializable {
                }
        }
 
+       public void addJars(List<URL> libraries) {
+               libraries.stream().map(jar -> {
+                       try {
+                               return new Path(jar.toURI());
+                       } catch (URISyntaxException e) {
+                               throw new RuntimeException("URL is invalid. 
This should not happen.", e);
+                       }
+               }).forEach(this::addJar);
+       }
+
        /**
         * Gets the list of assigned user jar paths.
         *
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 278a75b..bab31d3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -51,7 +51,7 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
                                ctx.getClient().getFlinkConfiguration(),
                                getParallelism());
 
-               ClientUtils.addJarFiles(jobGraph, ctx.getJars());
+               jobGraph.addJars(ctx.getJars());
                jobGraph.setClasspaths(ctx.getClasspaths());
 
                // running from the CLI will override the savepoint restore 
settings
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 1fb0a39..b4d11a4 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.cli.CliArgsException;
 import org.apache.flink.client.cli.CustomCommandLine;
@@ -465,7 +464,7 @@ public class ExecutionContext<ClusterID> {
                                        flinkConfig,
                                        parallelism);
 
-                       ClientUtils.addJarFiles(jobGraph, dependencies);
+                       jobGraph.addJars(dependencies);
                        
jobGraph.setClasspaths(executionParameters.getClasspaths());
                        
jobGraph.setSavepointRestoreSettings(executionParameters.getSavepointRestoreSettings());
 

Reply via email to