This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 17e2747  [FLINK-13127][yarn] Add support for resource classloading for 
resources shipped using --yarnship.
17e2747 is described below

commit 17e2747e575cc4d9847be76b5ce9a75a7b6707f7
Author: David Moravek <david.mora...@firma.seznam.cz>
AuthorDate: Fri Jul 26 17:04:57 2019 +0200

    [FLINK-13127][yarn] Add support for resource classloading for resources 
shipped using --yarnship.
    
    This closes #9022.
---
 .../flink/yarn/AbstractYarnClusterDescriptor.java  | 99 +++++++++++-----------
 .../flink/yarn/YarnClusterDescriptorTest.java      |  6 +-
 .../org/apache/flink/yarn/YarnFileStageTest.java   | 13 ++-
 3 files changed, 66 insertions(+), 52 deletions(-)

diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 5b2bc2a..23d1d3a 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -83,7 +83,6 @@ import java.io.ObjectOutputStream;
 import java.io.PrintStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.net.URISyntaxException;
 import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
 import java.nio.file.SimpleFileVisitor;
@@ -1086,8 +1085,7 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        Path localSrcPath,
                        Map<String, LocalResource> localResources,
                        Path targetHomeDir,
-                       String relativeTargetPath) throws IOException, 
URISyntaxException {
-
+                       String relativeTargetPath) throws IOException {
                Tuple2<Path, LocalResource> resource = Utils.setupLocalResource(
                        fs,
                        appId.toString(),
@@ -1101,6 +1099,16 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
        }
 
        /**
+        * Match file name for "<tt>flink-dist*.jar</tt>" pattern.
+        *
+        * @param fileName file name to check
+        * @return true if file is a dist jar
+        */
+       private static boolean isDistJar(String fileName) {
+               return fileName.startsWith("flink-dist") && 
fileName.endsWith("jar");
+       }
+
+       /**
         * Recursively uploads (and registers) any (user and system) files in 
<tt>shipFiles</tt> except
         * for files matching "<tt>flink-dist*.jar</tt>" which should be 
uploaded separately.
         *
@@ -1128,64 +1136,59 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        ApplicationId appId,
                        List<Path> remotePaths,
                        Map<String, LocalResource> localResources,
-                       StringBuilder envShipFileList) throws IOException, 
URISyntaxException {
-
-               final List<String> classPaths = new ArrayList<>(2 + 
shipFiles.size());
+                       StringBuilder envShipFileList) throws IOException {
+               final List<Path> localPaths = new ArrayList<>();
+               final List<Path> relativePaths = new ArrayList<>();
                for (File shipFile : shipFiles) {
                        if (shipFile.isDirectory()) {
                                // add directories to the classpath
-                               java.nio.file.Path shipPath = shipFile.toPath();
+                               final java.nio.file.Path shipPath = 
shipFile.toPath();
                                final java.nio.file.Path parentPath = 
shipPath.getParent();
-
                                Files.walkFileTree(shipPath, new 
SimpleFileVisitor<java.nio.file.Path>() {
                                        @Override
-                                       public FileVisitResult 
visitFile(java.nio.file.Path file, BasicFileAttributes attrs)
-                                               throws IOException {
-                                               String fileName = 
file.getFileName().toString();
-                                               if 
(!(fileName.startsWith("flink-dist") &&
-                                                               
fileName.endsWith("jar"))) {
-
-                                                       java.nio.file.Path 
relativePath = parentPath.relativize(file);
-
-                                                       String key = 
relativePath.toString();
-                                                       try {
-                                                               Path remotePath 
= setupSingleLocalResource(
-                                                                       key,
-                                                                       fs,
-                                                                       appId,
-                                                                       new 
Path(file.toUri()),
-                                                                       
localResources,
-                                                                       
targetHomeDir,
-                                                                       
relativePath.getParent().toString());
-                                                               
remotePaths.add(remotePath);
-                                                               
envShipFileList.append(key).append("=")
-                                                                       
.append(remotePath).append(",");
-
-                                                               // add files to 
the classpath
-                                                               
classPaths.add(key);
-                                                       } catch 
(URISyntaxException e) {
-                                                               throw new 
IOException(e);
-                                                       }
-                                               }
-
+                                       public FileVisitResult 
visitFile(java.nio.file.Path file, BasicFileAttributes attrs) {
+                                               localPaths.add(new 
Path(file.toUri()));
+                                               relativePaths.add(new 
Path(parentPath.relativize(file).toString()));
                                                return FileVisitResult.CONTINUE;
                                        }
                                });
                        } else {
-                               if 
(!(shipFile.getName().startsWith("flink-dist") && 
shipFile.getName().endsWith("jar"))) {
-                                       Path shipLocalPath = new 
Path(shipFile.toURI());
-                                       String key = shipFile.getName();
-                                       Path remotePath = 
setupSingleLocalResource(
-                                               key, fs, appId, shipLocalPath, 
localResources, targetHomeDir, "");
-                                       remotePaths.add(remotePath);
-                                       
envShipFileList.append(key).append("=").append(remotePath).append(",");
-
-                                       // add files to the classpath
-                                       classPaths.add(key);
-                               }
+                               localPaths.add(new Path(shipFile.toURI()));
+                               relativePaths.add(new Path(shipFile.getName()));
                        }
+               }
 
+               final Set<String> archives = new HashSet<>();
+               final Set<String> resources = new HashSet<>();
+               for (int i = 0; i < localPaths.size(); i++) {
+                       final Path localPath = localPaths.get(i);
+                       final Path relativePath = relativePaths.get(i);
+                       if (!isDistJar(relativePath.getName())) {
+                               final String key = relativePath.toString();
+                               final Path remotePath = 
setupSingleLocalResource(
+                                       key,
+                                       fs,
+                                       appId,
+                                       localPath,
+                                       localResources,
+                                       targetHomeDir,
+                                       relativePath.getParent().toString());
+                               remotePaths.add(remotePath);
+                               
envShipFileList.append(key).append("=").append(remotePath).append(",");
+                               // add files to the classpath
+                               if (key.endsWith("jar")) {
+                                       archives.add(relativePath.toString());
+                               } else {
+                                       
resources.add(relativePath.getParent().toString());
+                               }
+                       }
                }
+
+               // construct classpath, we always want resource directories to 
go first, we also sort
+               // both resources and archives in order to make classpath 
deterministic
+               final ArrayList<String> classPaths = new ArrayList<>();
+               resources.stream().sorted().forEach(classPaths::add);
+               archives.stream().sorted().forEach(classPaths::add);
                return classPaths;
        }
 
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 842ebb6..4940f12 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -459,11 +459,11 @@ public class YarnClusterDescriptorTest extends TestLogger 
{
                
testEnvironmentDirectoryShipping(ConfigConstants.ENV_FLINK_PLUGINS_DIR);
        }
 
-       public void testEnvironmentDirectoryShipping(String 
environmentVariable) throws Exception {
+       private void testEnvironmentDirectoryShipping(String 
environmentVariable) throws Exception {
                try (YarnClusterDescriptor descriptor = 
createYarnClusterDescriptor()) {
                        File libFolder = 
temporaryFolder.newFolder().getAbsoluteFile();
                        File libFile = new File(libFolder, "libFile.jar");
-                       libFile.createNewFile();
+                       assertTrue(libFile.createNewFile());
 
                        Set<File> effectiveShipFiles = new HashSet<>();
 
@@ -487,7 +487,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
        }
 
        @Test
-       public void testEnvironmentEmptyPluginsShipping() throws Exception {
+       public void testEnvironmentEmptyPluginsShipping() {
                try (YarnClusterDescriptor descriptor = 
createYarnClusterDescriptor()) {
                        File pluginsFolder = 
Paths.get(temporaryFolder.getRoot().getAbsolutePath(), 
"s0m3_p4th_th4t_sh0uld_n0t_3x1sts").toFile();
                        Set<File> effectiveShipFiles = new HashSet<>();
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
index 527782c..d0fad6d 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.hamcrest.Matchers;
 import org.junit.AfterClass;
 import org.junit.Assume;
 import org.junit.Before;
@@ -164,6 +165,7 @@ public class YarnFileStageTest extends TestLogger {
                srcFiles.put("2", "Hello 2");
                srcFiles.put("nested/3", "Hello nested/3");
                srcFiles.put("nested/4/5", "Hello nested/4/5");
+               srcFiles.put("test.jar", "JAR Content");
                for (Map.Entry<String, String> src : srcFiles.entrySet()) {
                        File file = new File(srcDir, src.getKey());
                        //noinspection ResultOfMethodCallIgnored
@@ -177,7 +179,7 @@ public class YarnFileStageTest extends TestLogger {
                try {
                        List<Path> remotePaths = new ArrayList<>();
                        HashMap<String, LocalResource> localResources = new 
HashMap<>();
-                       AbstractYarnClusterDescriptor.uploadAndRegisterFiles(
+                       final List<String> classpath = 
AbstractYarnClusterDescriptor.uploadAndRegisterFiles(
                                Collections.singletonList(new 
File(srcPath.toUri().getPath())),
                                targetFileSystem,
                                targetDir,
@@ -185,6 +187,15 @@ public class YarnFileStageTest extends TestLogger {
                                remotePaths,
                                localResources,
                                new StringBuilder());
+
+                       assertThat(
+                               classpath,
+                               Matchers.containsInAnyOrder(
+                                       srcDir.getName(),
+                                       srcDir.getName() + "/nested",
+                                       srcDir.getName() + "/nested/4",
+                                       srcDir.getName() + "/test.jar"));
+
                        assertEquals(srcFiles.size(), localResources.size());
 
                        Path workDir = ConverterUtils

Reply via email to