[ https://issues.apache.org/jira/browse/GOBBLIN-2135?focusedWorklogId=934542&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-934542 ]
ASF GitHub Bot logged work on GOBBLIN-2135: ------------------------------------------- Author: ASF GitHub Bot Created on: 12/Sep/24 15:09 Start Date: 12/Sep/24 15:09 Worklog Time Spent: 10m Work Description: phet commented on code in PR #4030: URL: https://github.com/apache/gobblin/pull/4030#discussion_r1757074356 ########## gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java: ########## @@ -720,26 +731,26 @@ private void addLibJars(Path srcLibJarDir, Optional<Map<String, LocalResource>> } for (FileStatus libJarFile : libJarFiles) { - Path destFilePath = new Path(destDir, libJarFile.getPath().getName()); - this.fs.copyFromLocalFile(libJarFile.getPath(), destFilePath); - if (resourceMap.isPresent()) { + Path destFilePath = HdfsJarUploadUtils.calculateDestJarFile(this.fs, libJarFile, unsharedDir, destCacheDir); + if (HdfsJarUploadUtils.uploadJarToHdfs(fs, libJarFile, MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath) && resourceMap.isPresent()) { YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, LocalResourceType.FILE, resourceMap.get()); + } else { + LOGGER.warn("Failed to upload jar file {} to HDFS", libJarFile.getPath()); } } } - - private void addAppJars(String jarFilePathList, Optional<Map<String, LocalResource>> resourceMap, - Path destDir, FileSystem localFs) throws IOException { + private void addAppJars(String jarFilePathList, Optional<Map<String, LocalResource>> resourceMap, Path destCacheDir, Path unsharedDir, + FileSystem localFs) throws IOException { for (String jarFilePath : SPLITTER.split(jarFilePathList)) { Path srcFilePath = new Path(jarFilePath); - Path destFilePath = new Path(destDir, srcFilePath.getName()); - if (localFs.exists(srcFilePath)) { - this.fs.copyFromLocalFile(srcFilePath, destFilePath); + FileStatus localJar = localFs.getFileStatus(srcFilePath); + Path destFilePath = HdfsJarUploadUtils.calculateDestJarFile(this.fs, localJar, unsharedDir, destCacheDir); + if (HdfsJarUploadUtils.uploadJarToHdfs(fs, localJar, MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath)) { + if (resourceMap.isPresent()) { + YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, LocalResourceType.FILE, resourceMap.get()); + } } else { - LOGGER.warn("The src destination " + srcFilePath + " doesn't exists"); - } - if (resourceMap.isPresent()) { - YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, LocalResourceType.FILE, resourceMap.get()); + LOGGER.warn("Failed to upload jar file {} to HDFS", srcFilePath); Review Comment: I guess that's fine to start. how about a `// TODO: decide whether to fail-fast here, given the job may be unable to run w/o it` ########## gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java: ########## @@ -720,26 +731,26 @@ private void addLibJars(Path srcLibJarDir, Optional<Map<String, LocalResource>> } for (FileStatus libJarFile : libJarFiles) { - Path destFilePath = new Path(destDir, libJarFile.getPath().getName()); - this.fs.copyFromLocalFile(libJarFile.getPath(), destFilePath); - if (resourceMap.isPresent()) { + Path destFilePath = HdfsJarUploadUtils.calculateDestJarFile(this.fs, libJarFile, unsharedDir, destCacheDir); + if (HdfsJarUploadUtils.uploadJarToHdfs(fs, libJarFile, MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath) && resourceMap.isPresent()) { YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, LocalResourceType.FILE, resourceMap.get()); + } else { + LOGGER.warn("Failed to upload jar file {} to HDFS", libJarFile.getPath()); } } } - - private void addAppJars(String jarFilePathList, Optional<Map<String, LocalResource>> resourceMap, - Path destDir, FileSystem localFs) throws IOException { + private void addAppJars(String jarFilePathList, Optional<Map<String, LocalResource>> resourceMap, Path destCacheDir, Path unsharedDir, + FileSystem localFs) throws IOException { for (String jarFilePath : SPLITTER.split(jarFilePathList)) { Path srcFilePath = new Path(jarFilePath); - Path destFilePath = new Path(destDir, srcFilePath.getName()); - if (localFs.exists(srcFilePath)) { - this.fs.copyFromLocalFile(srcFilePath, destFilePath); + FileStatus localJar = localFs.getFileStatus(srcFilePath); + Path destFilePath = HdfsJarUploadUtils.calculateDestJarFile(this.fs, localJar, unsharedDir, destCacheDir); + if (HdfsJarUploadUtils.uploadJarToHdfs(fs, localJar, MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath)) { + if (resourceMap.isPresent()) { + YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, LocalResourceType.FILE, resourceMap.get()); + } } else { - LOGGER.warn("The src destination " + srcFilePath + " doesn't exists"); - } - if (resourceMap.isPresent()) { - YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, LocalResourceType.FILE, resourceMap.get()); + LOGGER.warn("Failed to upload jar file {} to HDFS", srcFilePath); Review Comment: I guess that's fine to start. how about a ``` // TODO: decide whether to fail-fast here, given the job may be unable to run w/o it ``` Issue Time Tracking ------------------- Worklog Id: (was: 934542) Time Spent: 1h 10m (was: 1h) > Cache Yarn jars in GobblinYarnAppLauncher > ----------------------------------------- > > Key: GOBBLIN-2135 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2135 > Project: Apache Gobblin > Issue Type: Improvement > Reporter: William Lo > Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > > Gobblin YARN Application Launcher lacks some functionality used in > MRJobLauncher. One of the biggest gaps in feature parity is the absence of > jar caching, where MRJobLauncher creates a monthly cache that is > automatically cleaned up by subsequent executions performed 2 months in > advance. > YARN/MR requires uploading jars to HDFS, this step can be quite slow (~15 > mins for a sizeable job to get all the jars), and given that many jobs do > share the same jars, it makes sense to cache them together and only provide > YARN the shared path. > We also want to ensure that SNAPSHOT jars are other files are not uploaded to > a cache, since they are not immutable unlike jar versions on Artifactory. -- This message was sent by Atlassian Jira (v8.20.10#820010)