phet commented on code in PR #4030: URL: https://github.com/apache/gobblin/pull/4030#discussion_r1757074356
########## gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java: ########## @@ -720,26 +731,26 @@ private void addLibJars(Path srcLibJarDir, Optional<Map<String, LocalResource>> } for (FileStatus libJarFile : libJarFiles) { - Path destFilePath = new Path(destDir, libJarFile.getPath().getName()); - this.fs.copyFromLocalFile(libJarFile.getPath(), destFilePath); - if (resourceMap.isPresent()) { + Path destFilePath = HdfsJarUploadUtils.calculateDestJarFile(this.fs, libJarFile, unsharedDir, destCacheDir); + if (HdfsJarUploadUtils.uploadJarToHdfs(fs, libJarFile, MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath) && resourceMap.isPresent()) { YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, LocalResourceType.FILE, resourceMap.get()); + } else { + LOGGER.warn("Failed to upload jar file {} to HDFS", libJarFile.getPath()); } } } - - private void addAppJars(String jarFilePathList, Optional<Map<String, LocalResource>> resourceMap, - Path destDir, FileSystem localFs) throws IOException { + private void addAppJars(String jarFilePathList, Optional<Map<String, LocalResource>> resourceMap, Path destCacheDir, Path unsharedDir, + FileSystem localFs) throws IOException { for (String jarFilePath : SPLITTER.split(jarFilePathList)) { Path srcFilePath = new Path(jarFilePath); - Path destFilePath = new Path(destDir, srcFilePath.getName()); - if (localFs.exists(srcFilePath)) { - this.fs.copyFromLocalFile(srcFilePath, destFilePath); + FileStatus localJar = localFs.getFileStatus(srcFilePath); + Path destFilePath = HdfsJarUploadUtils.calculateDestJarFile(this.fs, localJar, unsharedDir, destCacheDir); + if (HdfsJarUploadUtils.uploadJarToHdfs(fs, localJar, MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath)) { + if (resourceMap.isPresent()) { + YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, LocalResourceType.FILE, resourceMap.get()); + } } else { - LOGGER.warn("The src destination " + srcFilePath + " doesn't exists"); - } - if (resourceMap.isPresent()) { - YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, LocalResourceType.FILE, resourceMap.get()); + LOGGER.warn("Failed to upload jar file {} to HDFS", srcFilePath); Review Comment: I guess that's fine to start. how about a `// TODO: decide whether to fail-fast here, given the job may be unable to run w/o it` ########## gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java: ########## @@ -720,26 +731,26 @@ private void addLibJars(Path srcLibJarDir, Optional<Map<String, LocalResource>> } for (FileStatus libJarFile : libJarFiles) { - Path destFilePath = new Path(destDir, libJarFile.getPath().getName()); - this.fs.copyFromLocalFile(libJarFile.getPath(), destFilePath); - if (resourceMap.isPresent()) { + Path destFilePath = HdfsJarUploadUtils.calculateDestJarFile(this.fs, libJarFile, unsharedDir, destCacheDir); + if (HdfsJarUploadUtils.uploadJarToHdfs(fs, libJarFile, MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath) && resourceMap.isPresent()) { YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, LocalResourceType.FILE, resourceMap.get()); + } else { + LOGGER.warn("Failed to upload jar file {} to HDFS", libJarFile.getPath()); } } } - - private void addAppJars(String jarFilePathList, Optional<Map<String, LocalResource>> resourceMap, - Path destDir, FileSystem localFs) throws IOException { + private void addAppJars(String jarFilePathList, Optional<Map<String, LocalResource>> resourceMap, Path destCacheDir, Path unsharedDir, + FileSystem localFs) throws IOException { for (String jarFilePath : SPLITTER.split(jarFilePathList)) { Path srcFilePath = new Path(jarFilePath); - Path destFilePath = new Path(destDir, srcFilePath.getName()); - if (localFs.exists(srcFilePath)) { - this.fs.copyFromLocalFile(srcFilePath, destFilePath); + FileStatus localJar = localFs.getFileStatus(srcFilePath); + Path destFilePath = HdfsJarUploadUtils.calculateDestJarFile(this.fs, localJar, unsharedDir, destCacheDir); + if (HdfsJarUploadUtils.uploadJarToHdfs(fs, localJar, MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath)) { + if (resourceMap.isPresent()) { + YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, LocalResourceType.FILE, resourceMap.get()); + } } else { - LOGGER.warn("The src destination " + srcFilePath + " doesn't exists"); - } - if (resourceMap.isPresent()) { - YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, LocalResourceType.FILE, resourceMap.get()); + LOGGER.warn("Failed to upload jar file {} to HDFS", srcFilePath); Review Comment: I guess that's fine to start. how about a ``` // TODO: decide whether to fail-fast here, given the job may be unable to run w/o it ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org