This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch executors in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3356a23251d2f980309639cbebd6f6f25144a31c Author: Kostas Kloudas <kklou...@gmail.com> AuthorDate: Mon Nov 4 15:28:15 2019 +0100 [FLINK-XXXXX] Add the addJars(List<URL>) in JobGraph --- .../main/java/org/apache/flink/client/ClientUtils.java | 15 --------------- .../main/java/org/apache/flink/client/RemoteExecutor.java | 2 +- .../apache/flink/client/program/ContextEnvironment.java | 2 +- .../apache/flink/client/program/PackagedProgramUtils.java | 3 +-- .../java/org/apache/flink/client/program/ClientTest.java | 2 +- .../java/org/apache/flink/runtime/jobgraph/JobGraph.java | 11 +++++++++++ .../api/environment/StreamContextEnvironment.java | 2 +- .../table/client/gateway/local/ExecutionContext.java | 3 +-- 8 files changed, 17 insertions(+), 23 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java index d95da8f..d3c8928 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java @@ -36,7 +36,6 @@ import org.apache.flink.client.program.PackagedProgramUtils; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.client.program.ProgramMissingJobException; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -68,20 +67,6 @@ public enum ClientUtils { private static final Logger LOG = LoggerFactory.getLogger(ClientUtils.class); - /** - * Adds the given jar files to the {@link JobGraph} via {@link JobGraph#addJar}. This will - * throw an exception if a jar URL is not valid. - */ - public static void addJarFiles(JobGraph jobGraph, List<URL> jarFilesToAttach) { - for (URL jar : jarFilesToAttach) { - try { - jobGraph.addJar(new Path(jar.toURI())); - } catch (URISyntaxException e) { - throw new RuntimeException("URL is invalid. This should not happen.", e); - } - } - } - public static void checkJarFile(URL jar) throws IOException { File jarFile; try { diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java index b601367..71fde64 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java @@ -108,7 +108,7 @@ public class RemoteExecutor extends PlanExecutor { clientConfiguration, getDefaultParallelism()); - ClientUtils.addJarFiles(jobGraph, jarFiles); + jobGraph.addJars(jarFiles); jobGraph.setClasspaths(globalClasspaths); ClassLoader userCodeClassLoader = ClientUtils.buildUserCodeClassLoader( diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java index 8f9048c..08a02af 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java @@ -84,7 +84,7 @@ public class ContextEnvironment extends ExecutionEnvironment { client.getFlinkConfiguration(), getParallelism()); - ClientUtils.addJarFiles(jobGraph, this.jarFilesToAttach); + jobGraph.addJars(this.jarFilesToAttach); jobGraph.setClasspaths(this.classpathsToAttach); if (detached) { diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java index fa9f8b0..23840aa 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java @@ -20,7 +20,6 @@ package org.apache.flink.client.program; import org.apache.flink.api.common.JobID; import org.apache.flink.api.dag.Pipeline; -import org.apache.flink.client.ClientUtils; import org.apache.flink.client.FlinkPipelineTranslationUtil; import org.apache.flink.configuration.Configuration; import org.apache.flink.optimizer.CompilerException; @@ -61,7 +60,7 @@ public class PackagedProgramUtils { if (jobID != null) { jobGraph.setJobID(jobID); } - ClientUtils.addJarFiles(jobGraph, packagedProgram.getAllLibraries()); + jobGraph.addJars(packagedProgram.getAllLibraries()); jobGraph.setClasspaths(packagedProgram.getClasspaths()); jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings()); diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java index 6c6ec1d..74e316a 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java @@ -160,7 +160,7 @@ public class ClientTest extends TestLogger { new Configuration(), 1); - ClientUtils.addJarFiles(jobGraph, Collections.emptyList()); + jobGraph.addJars(Collections.emptyList()); jobGraph.setClasspaths(Collections.emptyList()); JobSubmissionResult result = ClientUtils.submitJob(clusterClient, jobGraph); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java index f63a518..b404966 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java @@ -31,6 +31,7 @@ import org.apache.flink.util.SerializedValue; import java.io.IOException; import java.io.Serializable; +import java.net.URISyntaxException; import java.net.URL; import java.util.ArrayList; import java.util.Collections; @@ -473,6 +474,16 @@ public class JobGraph implements Serializable { } } + public void addJars(List<URL> libraries) { + libraries.stream().map(jar -> { + try { + return new Path(jar.toURI()); + } catch (URISyntaxException e) { + throw new RuntimeException("URL is invalid. This should not happen.", e); + } + }).forEach(this::addJar); + } + /** * Gets the list of assigned user jar paths. * diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java index 278a75b..bab31d3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java @@ -51,7 +51,7 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { ctx.getClient().getFlinkConfiguration(), getParallelism()); - ClientUtils.addJarFiles(jobGraph, ctx.getJars()); + jobGraph.addJars(ctx.getJars()); jobGraph.setClasspaths(ctx.getClasspaths()); // running from the CLI will override the savepoint restore settings diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java index 1fb0a39..b4d11a4 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.client.ClientUtils; import org.apache.flink.client.FlinkPipelineTranslationUtil; import org.apache.flink.client.cli.CliArgsException; import org.apache.flink.client.cli.CustomCommandLine; @@ -465,7 +464,7 @@ public class ExecutionContext<ClusterID> { flinkConfig, parallelism); - ClientUtils.addJarFiles(jobGraph, dependencies); + jobGraph.addJars(dependencies); jobGraph.setClasspaths(executionParameters.getClasspaths()); jobGraph.setSavepointRestoreSettings(executionParameters.getSavepointRestoreSettings());