This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ab86e26  [hotfix][yarn] userJarFiles can be a local variable
ab86e26 is described below

commit ab86e26807c6151fbd9f8abd6177537c04dde0c2
Author: tison <[email protected]>
AuthorDate: Thu Apr 25 13:44:26 2019 +0800

    [hotfix][yarn] userJarFiles can be a local variable
    
    This closes #8264.
---
 .../apache/flink/yarn/AbstractYarnClusterDescriptor.java | 16 ++++++----------
 1 file changed, 6 insertions(+), 10 deletions(-)

diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index ec0b66f..ea707ac 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -100,6 +100,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR;
 import static 
org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH;
@@ -143,10 +144,6 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
 
        private String applicationType;
 
-       /** Optional Jar file to include in the system class loader of all 
application nodes
-        * (for per-job submission). */
-       private final Set<File> userJarFiles = new HashSet<>();
-
        private YarnConfigOptions.UserJarInclusion userJarInclusion;
 
        public AbstractYarnClusterDescriptor(
@@ -739,12 +736,11 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                                        1));
                }
 
-               if (jobGraph != null) {
-                       // add the user code jars from the provided JobGraph
-                       for (org.apache.flink.core.fs.Path path : 
jobGraph.getUserJars()) {
-                               userJarFiles.add(new File(path.toUri()));
-                       }
-               }
+               final Set<File> userJarFiles = (jobGraph == null)
+                       // not per-job submission
+                       ? Collections.emptySet()
+                       // add user code jars from the provided JobGraph
+                       : jobGraph.getUserJars().stream().map(f -> 
f.toUri()).map(File::new).collect(Collectors.toSet());
 
                // local resource map for Yarn
                final Map<String, LocalResource> localResources = new 
HashMap<>(2 + systemShipFiles.size() + userJarFiles.size());

Reply via email to