[ 
https://issues.apache.org/jira/browse/GOBBLIN-2135?focusedWorklogId=934542&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-934542
 ]

ASF GitHub Bot logged work on GOBBLIN-2135:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Sep/24 15:09
            Start Date: 12/Sep/24 15:09
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #4030:
URL: https://github.com/apache/gobblin/pull/4030#discussion_r1757074356


##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java:
##########
@@ -720,26 +731,26 @@ private void addLibJars(Path srcLibJarDir, 
Optional<Map<String, LocalResource>>
     }
 
     for (FileStatus libJarFile : libJarFiles) {
-      Path destFilePath = new Path(destDir, libJarFile.getPath().getName());
-      this.fs.copyFromLocalFile(libJarFile.getPath(), destFilePath);
-      if (resourceMap.isPresent()) {
+      Path destFilePath = HdfsJarUploadUtils.calculateDestJarFile(this.fs, 
libJarFile, unsharedDir, destCacheDir);
+      if (HdfsJarUploadUtils.uploadJarToHdfs(fs, libJarFile, 
MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath) && resourceMap.isPresent()) 
{
         YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, 
LocalResourceType.FILE, resourceMap.get());
+      } else {
+        LOGGER.warn("Failed to upload jar file {} to HDFS", 
libJarFile.getPath());
       }
     }
   }
-
-  private void addAppJars(String jarFilePathList, Optional<Map<String, 
LocalResource>> resourceMap,
-      Path destDir, FileSystem localFs) throws IOException {
+  private void addAppJars(String jarFilePathList, Optional<Map<String, 
LocalResource>> resourceMap, Path destCacheDir, Path unsharedDir,
+      FileSystem localFs) throws IOException {
     for (String jarFilePath : SPLITTER.split(jarFilePathList)) {
       Path srcFilePath = new Path(jarFilePath);
-      Path destFilePath = new Path(destDir, srcFilePath.getName());
-      if (localFs.exists(srcFilePath)) {
-        this.fs.copyFromLocalFile(srcFilePath, destFilePath);
+      FileStatus localJar = localFs.getFileStatus(srcFilePath);
+      Path destFilePath = HdfsJarUploadUtils.calculateDestJarFile(this.fs, 
localJar, unsharedDir, destCacheDir);
+      if (HdfsJarUploadUtils.uploadJarToHdfs(fs, localJar, 
MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath)) {
+        if (resourceMap.isPresent()) {
+          YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, 
LocalResourceType.FILE, resourceMap.get());
+        }
       } else {
-        LOGGER.warn("The src destination " + srcFilePath + " doesn't exists");
-      }
-      if (resourceMap.isPresent()) {
-        YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, 
LocalResourceType.FILE, resourceMap.get());
+        LOGGER.warn("Failed to upload jar file {} to HDFS", srcFilePath);

Review Comment:
   I guess that's fine to start.  how about a `// TODO: decide whether to 
fail-fast here, given the job may be unable to run w/o it`



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java:
##########
@@ -720,26 +731,26 @@ private void addLibJars(Path srcLibJarDir, 
Optional<Map<String, LocalResource>>
     }
 
     for (FileStatus libJarFile : libJarFiles) {
-      Path destFilePath = new Path(destDir, libJarFile.getPath().getName());
-      this.fs.copyFromLocalFile(libJarFile.getPath(), destFilePath);
-      if (resourceMap.isPresent()) {
+      Path destFilePath = HdfsJarUploadUtils.calculateDestJarFile(this.fs, 
libJarFile, unsharedDir, destCacheDir);
+      if (HdfsJarUploadUtils.uploadJarToHdfs(fs, libJarFile, 
MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath) && resourceMap.isPresent()) 
{
         YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, 
LocalResourceType.FILE, resourceMap.get());
+      } else {
+        LOGGER.warn("Failed to upload jar file {} to HDFS", 
libJarFile.getPath());
       }
     }
   }
-
-  private void addAppJars(String jarFilePathList, Optional<Map<String, 
LocalResource>> resourceMap,
-      Path destDir, FileSystem localFs) throws IOException {
+  private void addAppJars(String jarFilePathList, Optional<Map<String, 
LocalResource>> resourceMap, Path destCacheDir, Path unsharedDir,
+      FileSystem localFs) throws IOException {
     for (String jarFilePath : SPLITTER.split(jarFilePathList)) {
       Path srcFilePath = new Path(jarFilePath);
-      Path destFilePath = new Path(destDir, srcFilePath.getName());
-      if (localFs.exists(srcFilePath)) {
-        this.fs.copyFromLocalFile(srcFilePath, destFilePath);
+      FileStatus localJar = localFs.getFileStatus(srcFilePath);
+      Path destFilePath = HdfsJarUploadUtils.calculateDestJarFile(this.fs, 
localJar, unsharedDir, destCacheDir);
+      if (HdfsJarUploadUtils.uploadJarToHdfs(fs, localJar, 
MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath)) {
+        if (resourceMap.isPresent()) {
+          YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, 
LocalResourceType.FILE, resourceMap.get());
+        }
       } else {
-        LOGGER.warn("The src destination " + srcFilePath + " doesn't exists");
-      }
-      if (resourceMap.isPresent()) {
-        YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, 
LocalResourceType.FILE, resourceMap.get());
+        LOGGER.warn("Failed to upload jar file {} to HDFS", srcFilePath);

Review Comment:
   I guess that's fine to start.  how about a
   ```
   // TODO: decide whether to fail-fast here, given the job may be unable to 
run w/o it
   ```





Issue Time Tracking
-------------------

    Worklog Id:     (was: 934542)
    Time Spent: 1h 10m  (was: 1h)

> Cache Yarn jars in GobblinYarnAppLauncher
> -----------------------------------------
>
>                 Key: GOBBLIN-2135
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2135
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: William Lo
>            Priority: Major
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Gobblin YARN Application Launcher lacks some functionality used in 
> MRJobLauncher. One of the biggest gaps in feature parity is the absence of 
> jar caching, where MRJobLauncher creates a monthly cache that is 
> automatically cleaned up by subsequent executions performed 2 months in 
> advance.
> YARN/MR requires uploading jars to HDFS, this step can be quite slow (~15 
> mins for a sizeable job to get all the jars), and given that many jobs do 
> share the same jars, it makes sense to cache them together and only provide 
> YARN the shared path. 
> We also want to ensure that SNAPSHOT jars are other files are not uploaded to 
> a cache, since they are not immutable unlike jar versions on Artifactory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to