This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 45b36e73e3376fd5736b81313259d6b2bd97336b
Author: Kostas Kloudas <[email protected]>
AuthorDate: Fri Nov 15 15:36:55 2019 +0100

    [FLINK-XXXXX] Add methods to ClientUtils that do not require userClassloader
---
 .../java/org/apache/flink/client/ClientUtils.java  | 37 ++++++++++------------
 1 file changed, 17 insertions(+), 20 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java 
b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 2c80236..ac247ac 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.client;
 
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.client.cli.ExecutionConfigAccessor;
 import org.apache.flink.client.program.ClusterClient;
@@ -44,6 +45,7 @@ import java.io.IOException;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.jar.JarFile;
@@ -99,15 +101,20 @@ public enum ClientUtils {
                return FlinkUserCodeClassLoaders.create(resolveOrder, urls, 
parent, alwaysParentFirstLoaderPatterns);
        }
 
-       public static JobExecutionResult submitJob(
-                       ClusterClient<?> client,
-                       JobGraph jobGraph) throws ProgramInvocationException {
-               checkNotNull(client);
-               checkNotNull(jobGraph);
+       public static CompletableFuture<JobID> 
submitJobAndGetJobID(ClusterClient<?> client, JobGraph jobGraph) {
+               return checkNotNull(client)
+                               .submitJob(checkNotNull(jobGraph))
+                               .thenApply(JobSubmissionResult::getJobID);
+       }
+
+       public static CompletableFuture<JobResult> 
submitJobAndGetResult(ClusterClient<?> client, JobGraph jobGraph) {
+               return submitJobAndGetJobID(client, jobGraph)
+                               .thenCompose(client::requestJobResult);
+       }
+
+       public static JobExecutionResult submitJob(ClusterClient<?> client, 
JobGraph jobGraph) throws ProgramInvocationException {
                try {
-                       return client
-                               .submitJob(jobGraph)
-                               .thenApply(JobSubmissionResult::getJobID)
+                       return submitJobAndGetJobID(client, jobGraph)
                                .thenApply(DetachedJobExecutionResult::new)
                                .get();
                } catch (InterruptedException | ExecutionException e) {
@@ -120,18 +127,11 @@ public enum ClientUtils {
                        ClusterClient<?> client,
                        JobGraph jobGraph,
                        ClassLoader classLoader) throws 
ProgramInvocationException {
-               checkNotNull(client);
-               checkNotNull(jobGraph);
                checkNotNull(classLoader);
 
                JobResult jobResult;
-
                try {
-                       jobResult = client
-                               .submitJob(jobGraph)
-                               .thenApply(JobSubmissionResult::getJobID)
-                               .thenCompose(client::requestJobResult)
-                               .get();
+                       jobResult = submitJobAndGetResult(client, 
jobGraph).get();
                } catch (InterruptedException | ExecutionException e) {
                        ExceptionUtils.checkInterrupted(e);
                        throw new ProgramInvocationException("Could not run 
job", jobGraph.getJobID(), e);
@@ -151,12 +151,9 @@ public enum ClientUtils {
 
                final ExecutionConfigAccessor executionConfigAccessor = 
ExecutionConfigAccessor.fromConfiguration(configuration);
 
-               final List<URL> jobJars = executionConfigAccessor.getJars();
-               final List<URL> classpaths = 
executionConfigAccessor.getClasspaths();
-
                final ClassLoader userCodeClassLoader = 
program.getUserCodeClassLoader();
-
                final ClassLoader contextClassLoader = 
Thread.currentThread().getContextClassLoader();
+
                try {
                        
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
 

Reply via email to