This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 17e2747 [FLINK-13127][yarn] Add support for resource classloading for resources shipped using --yarnship. 17e2747 is described below commit 17e2747e575cc4d9847be76b5ce9a75a7b6707f7 Author: David Moravek <david.mora...@firma.seznam.cz> AuthorDate: Fri Jul 26 17:04:57 2019 +0200 [FLINK-13127][yarn] Add support for resource classloading for resources shipped using --yarnship. This closes #9022. --- .../flink/yarn/AbstractYarnClusterDescriptor.java | 99 +++++++++++----------- .../flink/yarn/YarnClusterDescriptorTest.java | 6 +- .../org/apache/flink/yarn/YarnFileStageTest.java | 13 ++- 3 files changed, 66 insertions(+), 52 deletions(-) diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 5b2bc2a..23d1d3a 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -83,7 +83,6 @@ import java.io.ObjectOutputStream; import java.io.PrintStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.net.URISyntaxException; import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.SimpleFileVisitor; @@ -1086,8 +1085,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor Path localSrcPath, Map<String, LocalResource> localResources, Path targetHomeDir, - String relativeTargetPath) throws IOException, URISyntaxException { - + String relativeTargetPath) throws IOException { Tuple2<Path, LocalResource> resource = Utils.setupLocalResource( fs, appId.toString(), @@ -1101,6 +1099,16 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor } /** + * Match file name for "<tt>flink-dist*.jar</tt>" pattern. + * + * @param fileName file name to check + * @return true if file is a dist jar + */ + private static boolean isDistJar(String fileName) { + return fileName.startsWith("flink-dist") && fileName.endsWith("jar"); + } + + /** * Recursively uploads (and registers) any (user and system) files in <tt>shipFiles</tt> except * for files matching "<tt>flink-dist*.jar</tt>" which should be uploaded separately. * @@ -1128,64 +1136,59 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor ApplicationId appId, List<Path> remotePaths, Map<String, LocalResource> localResources, - StringBuilder envShipFileList) throws IOException, URISyntaxException { - - final List<String> classPaths = new ArrayList<>(2 + shipFiles.size()); + StringBuilder envShipFileList) throws IOException { + final List<Path> localPaths = new ArrayList<>(); + final List<Path> relativePaths = new ArrayList<>(); for (File shipFile : shipFiles) { if (shipFile.isDirectory()) { // add directories to the classpath - java.nio.file.Path shipPath = shipFile.toPath(); + final java.nio.file.Path shipPath = shipFile.toPath(); final java.nio.file.Path parentPath = shipPath.getParent(); - Files.walkFileTree(shipPath, new SimpleFileVisitor<java.nio.file.Path>() { @Override - public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) - throws IOException { - String fileName = file.getFileName().toString(); - if (!(fileName.startsWith("flink-dist") && - fileName.endsWith("jar"))) { - - java.nio.file.Path relativePath = parentPath.relativize(file); - - String key = relativePath.toString(); - try { - Path remotePath = setupSingleLocalResource( - key, - fs, - appId, - new Path(file.toUri()), - localResources, - targetHomeDir, - relativePath.getParent().toString()); - remotePaths.add(remotePath); - envShipFileList.append(key).append("=") - .append(remotePath).append(","); - - // add files to the classpath - classPaths.add(key); - } catch (URISyntaxException e) { - throw new IOException(e); - } - } - + public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) { + localPaths.add(new Path(file.toUri())); + relativePaths.add(new Path(parentPath.relativize(file).toString())); return FileVisitResult.CONTINUE; } }); } else { - if (!(shipFile.getName().startsWith("flink-dist") && shipFile.getName().endsWith("jar"))) { - Path shipLocalPath = new Path(shipFile.toURI()); - String key = shipFile.getName(); - Path remotePath = setupSingleLocalResource( - key, fs, appId, shipLocalPath, localResources, targetHomeDir, ""); - remotePaths.add(remotePath); - envShipFileList.append(key).append("=").append(remotePath).append(","); - - // add files to the classpath - classPaths.add(key); - } + localPaths.add(new Path(shipFile.toURI())); + relativePaths.add(new Path(shipFile.getName())); } + } + final Set<String> archives = new HashSet<>(); + final Set<String> resources = new HashSet<>(); + for (int i = 0; i < localPaths.size(); i++) { + final Path localPath = localPaths.get(i); + final Path relativePath = relativePaths.get(i); + if (!isDistJar(relativePath.getName())) { + final String key = relativePath.toString(); + final Path remotePath = setupSingleLocalResource( + key, + fs, + appId, + localPath, + localResources, + targetHomeDir, + relativePath.getParent().toString()); + remotePaths.add(remotePath); + envShipFileList.append(key).append("=").append(remotePath).append(","); + // add files to the classpath + if (key.endsWith("jar")) { + archives.add(relativePath.toString()); + } else { + resources.add(relativePath.getParent().toString()); + } + } } + + // construct classpath, we always want resource directories to go first, we also sort + // both resources and archives in order to make classpath deterministic + final ArrayList<String> classPaths = new ArrayList<>(); + resources.stream().sorted().forEach(classPaths::add); + archives.stream().sorted().forEach(classPaths::add); return classPaths; } diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java index 842ebb6..4940f12 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java @@ -459,11 +459,11 @@ public class YarnClusterDescriptorTest extends TestLogger { testEnvironmentDirectoryShipping(ConfigConstants.ENV_FLINK_PLUGINS_DIR); } - public void testEnvironmentDirectoryShipping(String environmentVariable) throws Exception { + private void testEnvironmentDirectoryShipping(String environmentVariable) throws Exception { try (YarnClusterDescriptor descriptor = createYarnClusterDescriptor()) { File libFolder = temporaryFolder.newFolder().getAbsoluteFile(); File libFile = new File(libFolder, "libFile.jar"); - libFile.createNewFile(); + assertTrue(libFile.createNewFile()); Set<File> effectiveShipFiles = new HashSet<>(); @@ -487,7 +487,7 @@ public class YarnClusterDescriptorTest extends TestLogger { } @Test - public void testEnvironmentEmptyPluginsShipping() throws Exception { + public void testEnvironmentEmptyPluginsShipping() { try (YarnClusterDescriptor descriptor = createYarnClusterDescriptor()) { File pluginsFolder = Paths.get(temporaryFolder.getRoot().getAbsolutePath(), "s0m3_p4th_th4t_sh0uld_n0t_3x1sts").toFile(); Set<File> effectiveShipFiles = new HashSet<>(); diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java index 527782c..d0fad6d 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.hamcrest.Matchers; import org.junit.AfterClass; import org.junit.Assume; import org.junit.Before; @@ -164,6 +165,7 @@ public class YarnFileStageTest extends TestLogger { srcFiles.put("2", "Hello 2"); srcFiles.put("nested/3", "Hello nested/3"); srcFiles.put("nested/4/5", "Hello nested/4/5"); + srcFiles.put("test.jar", "JAR Content"); for (Map.Entry<String, String> src : srcFiles.entrySet()) { File file = new File(srcDir, src.getKey()); //noinspection ResultOfMethodCallIgnored @@ -177,7 +179,7 @@ public class YarnFileStageTest extends TestLogger { try { List<Path> remotePaths = new ArrayList<>(); HashMap<String, LocalResource> localResources = new HashMap<>(); - AbstractYarnClusterDescriptor.uploadAndRegisterFiles( + final List<String> classpath = AbstractYarnClusterDescriptor.uploadAndRegisterFiles( Collections.singletonList(new File(srcPath.toUri().getPath())), targetFileSystem, targetDir, @@ -185,6 +187,15 @@ public class YarnFileStageTest extends TestLogger { remotePaths, localResources, new StringBuilder()); + + assertThat( + classpath, + Matchers.containsInAnyOrder( + srcDir.getName(), + srcDir.getName() + "/nested", + srcDir.getName() + "/nested/4", + srcDir.getName() + "/test.jar")); + assertEquals(srcFiles.size(), localResources.size()); Path workDir = ConverterUtils