[ https://issues.apache.org/jira/browse/GOBBLIN-2135?focusedWorklogId=934343&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-934343 ]
ASF GitHub Bot logged work on GOBBLIN-2135: ------------------------------------------- Author: ASF GitHub Bot Created on: 11/Sep/24 19:52 Start Date: 11/Sep/24 19:52 Worklog Time Spent: 10m Work Description: Will-Lo commented on code in PR #4030: URL: https://github.com/apache/gobblin/pull/4030#discussion_r1755496938 ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java: ########## @@ -572,56 +573,22 @@ private void addJars(Path jarFileDir, String jarFileList, Configuration conf) th for (String jarFile : SPLITTER.split(jarFileList)) { Path srcJarFile = new Path(jarFile); FileStatus[] fileStatusList = lfs.globStatus(srcJarFile); - for (FileStatus status : fileStatusList) { + Path destJarFile = HdfsJarUploadUtils.calculateDestJarFile(fs, status, this.unsharedJarsDir, jarFileDir); // For each FileStatus there are chances it could fail in copying at the first attempt, due to file-existence // or file-copy is ongoing by other job instance since all Gobblin jobs share the same jar file directory. // the retryCount is to avoid cases (if any) where retry is going too far and causes job hanging. - int retryCount = 0; - boolean shouldFileBeAddedIntoDC = true; - Path destJarFile = calculateDestJarFile(status, jarFileDir); - // Adding destJarFile into HDFS until it exists and the size of file on targetPath matches the one on local path. - while (!this.fs.exists(destJarFile) || fs.getFileStatus(destJarFile).getLen() != status.getLen()) { - try { - if (this.fs.exists(destJarFile) && fs.getFileStatus(destJarFile).getLen() != status.getLen()) { - Thread.sleep(WAITING_TIME_ON_IMCOMPLETE_UPLOAD); - throw new IOException("Waiting for file to complete on uploading ... "); - } - // Set the first parameter as false for not deleting sourceFile - // Set the second parameter as false for not overwriting existing file on the target, by default it is true. - // If the file is preExisted but overwrite flag set to false, then an IOException if thrown. - this.fs.copyFromLocalFile(false, false, status.getPath(), destJarFile); - } catch (IOException | InterruptedException e) { - LOG.warn("Path:" + destJarFile + " is not copied successfully. Will require retry."); - retryCount += 1; - if (retryCount >= this.jarFileMaximumRetry) { - LOG.error("The jar file:" + destJarFile + "failed in being copied into hdfs", e); - // If retry reaches upper limit, skip copying this file. - shouldFileBeAddedIntoDC = false; - break; - } - } - } - if (shouldFileBeAddedIntoDC) { + if (HdfsJarUploadUtils.uploadJarToHdfs(this.fs, status, this.jarFileMaximumRetry, destJarFile)) { // Then add the jar file on HDFS to the classpath LOG.info(String.format("Adding %s to classpath", destJarFile)); DistributedCache.addFileToClassPath(destJarFile, conf, this.fs); + } else { + LOG.error("Failed to upload jar file: " + status.getPath()); Review Comment: Possibly we should throw an error, but I think given that another job could be uploading the same jars though it might be better to let the job attempt to try and run, if that job fails it should be emitting the failed event anyways. Issue Time Tracking ------------------- Worklog Id: (was: 934343) Time Spent: 0.5h (was: 20m) > Cache Yarn jars in GobblinYarnAppLauncher > ----------------------------------------- > > Key: GOBBLIN-2135 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2135 > Project: Apache Gobblin > Issue Type: Improvement > Reporter: William Lo > Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > Gobblin YARN Application Launcher lacks some functionality used in > MRJobLauncher. One of the biggest gaps in feature parity is the absence of > jar caching, where MRJobLauncher creates a monthly cache that is > automatically cleaned up by subsequent executions performed 2 months in > advance. > YARN/MR requires uploading jars to HDFS, this step can be quite slow (~15 > mins for a sizeable job to get all the jars), and given that many jobs do > share the same jars, it makes sense to cache them together and only provide > YARN the shared path. > We also want to ensure that SNAPSHOT jars are other files are not uploaded to > a cache, since they are not immutable unlike jar versions on Artifactory. -- This message was sent by Atlassian Jira (v8.20.10#820010)