[ 
https://issues.apache.org/jira/browse/GOBBLIN-2135?focusedWorklogId=933302&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-933302
 ]

ASF GitHub Bot logged work on GOBBLIN-2135:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Sep/24 06:24
            Start Date: 05/Sep/24 06:24
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #4030:
URL: https://github.com/apache/gobblin/pull/4030#discussion_r1735415804


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java:
##########
@@ -572,56 +573,22 @@ private void addJars(Path jarFileDir, String jarFileList, 
Configuration conf) th
     for (String jarFile : SPLITTER.split(jarFileList)) {
       Path srcJarFile = new Path(jarFile);
       FileStatus[] fileStatusList = lfs.globStatus(srcJarFile);
-
       for (FileStatus status : fileStatusList) {
+        Path destJarFile = HdfsJarUploadUtils.calculateDestJarFile(fs, status, 
this.unsharedJarsDir, jarFileDir);
         // For each FileStatus there are chances it could fail in copying at 
the first attempt, due to file-existence
         // or file-copy is ongoing by other job instance since all Gobblin 
jobs share the same jar file directory.
         // the retryCount is to avoid cases (if any) where retry is going too 
far and causes job hanging.
-        int retryCount = 0;
-        boolean shouldFileBeAddedIntoDC = true;
-        Path destJarFile = calculateDestJarFile(status, jarFileDir);
-        // Adding destJarFile into HDFS until it exists and the size of file 
on targetPath matches the one on local path.
-        while (!this.fs.exists(destJarFile) || 
fs.getFileStatus(destJarFile).getLen() != status.getLen()) {
-          try {
-            if (this.fs.exists(destJarFile) && 
fs.getFileStatus(destJarFile).getLen() != status.getLen()) {
-              Thread.sleep(WAITING_TIME_ON_IMCOMPLETE_UPLOAD);
-              throw new IOException("Waiting for file to complete on uploading 
... ");
-            }
-            // Set the first parameter as false for not deleting sourceFile
-            // Set the second parameter as false for not overwriting existing 
file on the target, by default it is true.
-            // If the file is preExisted but overwrite flag set to false, then 
an IOException if thrown.
-            this.fs.copyFromLocalFile(false, false, status.getPath(), 
destJarFile);
-          } catch (IOException | InterruptedException e) {
-            LOG.warn("Path:" + destJarFile + " is not copied successfully. 
Will require retry.");
-            retryCount += 1;
-            if (retryCount >= this.jarFileMaximumRetry) {
-              LOG.error("The jar file:" + destJarFile + "failed in being 
copied into hdfs", e);
-              // If retry reaches upper limit, skip copying this file.
-              shouldFileBeAddedIntoDC = false;
-              break;
-            }
-          }
-        }
-        if (shouldFileBeAddedIntoDC) {
+        if (HdfsJarUploadUtils.uploadJarToHdfs(this.fs, status, 
this.jarFileMaximumRetry, destJarFile)) {
           // Then add the jar file on HDFS to the classpath
           LOG.info(String.format("Adding %s to classpath", destJarFile));
           DistributedCache.addFileToClassPath(destJarFile, conf, this.fs);
+        } else {
+          LOG.error("Failed to upload jar file: " + status.getPath());

Review Comment:
   I don't find the prior code throwing an error...
   
   nonetheless, should everything continue on w/ just some error logs?
   
   shouldn't we instead fail the overall job because presumably necessary jars 
won't be there?



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -484,12 +487,29 @@ private void requestContainer(Optional<String> 
preferredNode, Resource resource)
   protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo 
containerInfo)
       throws IOException {
     Path appWorkDir = 
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, 
this.applicationName, this.applicationId);
+    Path containerJarsUnsharedDir = new Path(appWorkDir, 
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);

Review Comment:
   though you said in the PR desc, suggest a comment here about "unshared" dir 
being for "-SNAPSHOT" versions



##########
gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/HdfsJarUploadUtils.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.filesystem;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Utility class for uploading jar files to HDFS with retries to handle 
concurrency
+ */
+@Slf4j
+public class HdfsJarUploadUtils {
+
+  private static final long WAITING_TIME_ON_IMCOMPLETE_UPLOAD = 3000;
+
+  /**
+   * Calculate the target filePath of the jar file to be copied on HDFS,
+   * given the {@link FileStatus} of a jarFile and the path of directory that 
contains jar.
+   * Snapshot dirs should not be shared, as different jobs may be using 
different versions of it.
+   * @param fs
+   * @param localJar
+   * @param unsharedJarsDir
+   * @param jarCacheDir
+   * @return
+   * @throws IOException
+   */
+  public static Path calculateDestJarFile(FileSystem fs, FileStatus localJar, 
Path unsharedJarsDir, Path jarCacheDir) throws IOException {

Review Comment:
   suggest to name `calculateDestJarFilePath`.
   
   and, since the only use is `localJar.getPath().getName()` suggest to make 
the param `String jarName`



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java:
##########
@@ -300,10 +304,10 @@ public GobblinYarnAppLauncher(Config config, 
YarnConfiguration yarnConfiguration
         GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE);
 
     this.detachOnExitEnabled = ConfigUtils
-        .getBoolean(config, 
GobblinYarnConfigurationKeys.GOBBLIN_YARN_DETACH_ON_EXIT_ENABLED,
+        .getBoolean(this.config, 
GobblinYarnConfigurationKeys.GOBBLIN_YARN_DETACH_ON_EXIT_ENABLED,
             GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_DETACH_ON_EXIT);
-    this.appLauncherMode = ConfigUtils.getString(config, 
GOBBLIN_YARN_APP_LAUNCHER_MODE, DEFAULT_GOBBLIN_YARN_APP_LAUNCHER_MODE);
-
+    this.appLauncherMode = ConfigUtils.getString(this.config, 
GOBBLIN_YARN_APP_LAUNCHER_MODE, DEFAULT_GOBBLIN_YARN_APP_LAUNCHER_MODE);
+    this.jarCacheEnabled = ConfigUtils.getBoolean(config, 
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, 
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT);

Review Comment:
   NBD, but you just updated the two above to be `this.config`, but only use 
`config` here :)



##########
gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/HdfsJarUploadUtils.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.filesystem;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Utility class for uploading jar files to HDFS with retries to handle 
concurrency
+ */
+@Slf4j
+public class HdfsJarUploadUtils {
+
+  private static final long WAITING_TIME_ON_IMCOMPLETE_UPLOAD = 3000;

Review Comment:
   are these seconds?  suggest a suffix to clarify



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java:
##########
@@ -199,6 +203,18 @@ public static void setYarnClassPath(Config config, 
Configuration yarnConfigurati
     }
   }
 
+  public static Path getJarPathCacheAndCleanIfNeeded(Config config, FileSystem 
fs) throws IOException {

Review Comment:
   this feels like two separate operations:
   ```
   Path calcJarCacheCurrentPath(Config, FileSystem);
   boolean retainKLatestCachePaths(Path parentCachePath, int k);  /// true iff 
any deletion
   ```



##########
gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/HdfsJarUploadUtils.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.filesystem;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Utility class for uploading jar files to HDFS with retries to handle 
concurrency
+ */
+@Slf4j
+public class HdfsJarUploadUtils {
+
+  private static final long WAITING_TIME_ON_IMCOMPLETE_UPLOAD = 3000;
+
+  /**
+   * Calculate the target filePath of the jar file to be copied on HDFS,
+   * given the {@link FileStatus} of a jarFile and the path of directory that 
contains jar.
+   * Snapshot dirs should not be shared, as different jobs may be using 
different versions of it.
+   * @param fs
+   * @param localJar
+   * @param unsharedJarsDir
+   * @param jarCacheDir
+   * @return
+   * @throws IOException
+   */
+  public static Path calculateDestJarFile(FileSystem fs, FileStatus localJar, 
Path unsharedJarsDir, Path jarCacheDir) throws IOException {
+    Path uploadDir = localJar.getPath().getName().contains("SNAPSHOT") ? 
unsharedJarsDir : jarCacheDir;
+    Path destJarFile = new Path(fs.makeQualified(uploadDir), 
localJar.getPath().getName());
+    return destJarFile;
+  }
+  /**
+   * Upload a jar file to HDFS with retries to handle already existing jars
+   * @param fs
+   * @param localJar
+   * @param destJarFile
+   * @param jarFileMaximumRetry
+   * @return
+   * @throws IOException
+   */
+  public static boolean uploadJarToHdfs(FileSystem fs, FileStatus localJar, 
int jarFileMaximumRetry, Path destJarFile) throws IOException {

Review Comment:
   `jarFileMaximumRetry`  => simply `maxAttempts`?



##########
gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/HdfsJarUploadUtils.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.filesystem;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Utility class for uploading jar files to HDFS with retries to handle 
concurrency
+ */
+@Slf4j
+public class HdfsJarUploadUtils {
+
+  private static final long WAITING_TIME_ON_IMCOMPLETE_UPLOAD = 3000;
+
+  /**
+   * Calculate the target filePath of the jar file to be copied on HDFS,
+   * given the {@link FileStatus} of a jarFile and the path of directory that 
contains jar.
+   * Snapshot dirs should not be shared, as different jobs may be using 
different versions of it.
+   * @param fs
+   * @param localJar
+   * @param unsharedJarsDir
+   * @param jarCacheDir
+   * @return
+   * @throws IOException
+   */
+  public static Path calculateDestJarFile(FileSystem fs, FileStatus localJar, 
Path unsharedJarsDir, Path jarCacheDir) throws IOException {
+    Path uploadDir = localJar.getPath().getName().contains("SNAPSHOT") ? 
unsharedJarsDir : jarCacheDir;
+    Path destJarFile = new Path(fs.makeQualified(uploadDir), 
localJar.getPath().getName());
+    return destJarFile;
+  }
+  /**
+   * Upload a jar file to HDFS with retries to handle already existing jars
+   * @param fs
+   * @param localJar
+   * @param destJarFile
+   * @param jarFileMaximumRetry
+   * @return
+   * @throws IOException
+   */
+  public static boolean uploadJarToHdfs(FileSystem fs, FileStatus localJar, 
int jarFileMaximumRetry, Path destJarFile) throws IOException {
+    int retryCount = 0;
+    while (!fs.exists(destJarFile) || fs.getFileStatus(destJarFile).getLen() 
!= localJar.getLen()) {
+      try {
+        if (fs.exists(destJarFile) && fs.getFileStatus(destJarFile).getLen() 
!= localJar.getLen()) {
+          Thread.sleep(WAITING_TIME_ON_IMCOMPLETE_UPLOAD);

Review Comment:
   what are we waiting on here?  is `copyFromLocalFile` asynchronous?  or are 
we thinking a different process may be doing upload.
   
   with `overwriteAnyExistingDestFile == false`, we don't seem to undertake any 
"repair", do we?  instead we just wait `(waitTime * jarFileMaximumRetry)`



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java:
##########
@@ -640,31 +643,36 @@ private Resource 
prepareContainerResource(GetNewApplicationResponse newApplicati
 
   private Map<String, LocalResource> addAppMasterLocalResources(ApplicationId 
applicationId) throws IOException {
     Path appWorkDir = 
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, 
this.applicationName, applicationId.toString());
+    Path jarsRootDir = this.jarCacheEnabled ? 
YarnHelixUtils.getJarPathCacheAndCleanIfNeeded(this.config, this.fs) : 
appWorkDir;
 
     Path appMasterWorkDir = new Path(appWorkDir, 
GobblinYarnConfigurationKeys.APP_MASTER_WORK_DIR_NAME);
-    LOGGER.info("Configured GobblinApplicationMaster work directory to: {}", 
appMasterWorkDir.toString());
+    Path appMasterJarsCacheDir = new Path(jarsRootDir, 
GobblinYarnConfigurationKeys.APP_MASTER_WORK_DIR_NAME);
+    LOGGER.info("Configured GobblinApplicationMaster work directory to: {}", 
appMasterWorkDir);
+    LOGGER.info("Configured GobblinApplicationMaster jars directory to: {}", 
appMasterJarsCacheDir);
 
     Map<String, LocalResource> appMasterResources = Maps.newHashMap();
     FileSystem localFs = FileSystem.getLocal(new Configuration());
 
-    // NOTE: log after each step below for insight into what takes bulk of time
     if (this.config.hasPath(GobblinYarnConfigurationKeys.LIB_JARS_DIR_KEY)) {
-      Path libJarsDestDir = new Path(appWorkDir, 
GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME);
+      // Lib jars are shared between all containers, store at the root level
+      Path libJarsDestDir = new Path(jarsRootDir, 
GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME);
+      Path unsharedJarsDestDir = new Path(appWorkDir, 
GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME);
       addLibJars(new 
Path(this.config.getString(GobblinYarnConfigurationKeys.LIB_JARS_DIR_KEY)),
-          Optional.of(appMasterResources), libJarsDestDir, localFs);
-      LOGGER.info("Added lib jars to directory: {}", 
libJarsDestDir.toString());
+          Optional.of(appMasterResources), libJarsDestDir, 
unsharedJarsDestDir, localFs);
+      LOGGER.info("Added lib jars to directory: {}", libJarsDestDir);

Review Comment:
   it seems `addLibJars` could also add some to `unsharedJarsDestDir`, so good 
to mention that while logging



##########
gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/HdfsJarUploadUtils.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.filesystem;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Utility class for uploading jar files to HDFS with retries to handle 
concurrency
+ */
+@Slf4j
+public class HdfsJarUploadUtils {
+
+  private static final long WAITING_TIME_ON_IMCOMPLETE_UPLOAD = 3000;
+
+  /**
+   * Calculate the target filePath of the jar file to be copied on HDFS,
+   * given the {@link FileStatus} of a jarFile and the path of directory that 
contains jar.
+   * Snapshot dirs should not be shared, as different jobs may be using 
different versions of it.
+   * @param fs
+   * @param localJar
+   * @param unsharedJarsDir
+   * @param jarCacheDir
+   * @return
+   * @throws IOException
+   */
+  public static Path calculateDestJarFile(FileSystem fs, FileStatus localJar, 
Path unsharedJarsDir, Path jarCacheDir) throws IOException {
+    Path uploadDir = localJar.getPath().getName().contains("SNAPSHOT") ? 
unsharedJarsDir : jarCacheDir;
+    Path destJarFile = new Path(fs.makeQualified(uploadDir), 
localJar.getPath().getName());
+    return destJarFile;
+  }
+  /**
+   * Upload a jar file to HDFS with retries to handle already existing jars
+   * @param fs
+   * @param localJar
+   * @param destJarFile
+   * @param jarFileMaximumRetry
+   * @return
+   * @throws IOException
+   */
+  public static boolean uploadJarToHdfs(FileSystem fs, FileStatus localJar, 
int jarFileMaximumRetry, Path destJarFile) throws IOException {
+    int retryCount = 0;
+    while (!fs.exists(destJarFile) || fs.getFileStatus(destJarFile).getLen() 
!= localJar.getLen()) {
+      try {
+        if (fs.exists(destJarFile) && fs.getFileStatus(destJarFile).getLen() 
!= localJar.getLen()) {
+          Thread.sleep(WAITING_TIME_ON_IMCOMPLETE_UPLOAD);
+          throw new IOException("Waiting for file to complete on uploading ... 
");
+        }
+        // Set the first parameter as false for not deleting sourceFile
+        // Set the second parameter as false for not overwriting existing file 
on the target, by default it is true.
+        // If the file is preExisted but overwrite flag set to false, then an 
IOException if thrown.
+        fs.copyFromLocalFile(false, false, localJar.getPath(), destJarFile);

Review Comment:
   ```
   boolean deleteSourceFile = false;
   boolean overwriteAnyExistingDestFile = false; // IOException will be thrown 
if does already exist
   fs.copyFromLocalFile(deleteSourceFile, overwriteAnyExistingDestFile, 
localJar.getPath(), destJarFile)
   ```
   



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -484,12 +487,29 @@ private void requestContainer(Optional<String> 
preferredNode, Resource resource)
   protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo 
containerInfo)
       throws IOException {
     Path appWorkDir = 
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, 
this.applicationName, this.applicationId);
+    Path containerJarsUnsharedDir = new Path(appWorkDir, 
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
+    Path jarCacheDir = this.jarCacheEnabled ? 
YarnHelixUtils.getJarPathCacheAndCleanIfNeeded(this.config, this.fs) : 
appWorkDir;
+    Path containerJarsCachedDir = new Path(jarCacheDir, 
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
+    LOGGER.info("Container cached jars root dir: " + containerJarsCachedDir);
+    LOGGER.info("Container uncached jars root dir: " + 
containerJarsUnsharedDir);

Review Comment:
   I'd find something like "execution-private" or "unshared" more explicit than 
uncached.
   



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -484,12 +487,29 @@ private void requestContainer(Optional<String> 
preferredNode, Resource resource)
   protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo 
containerInfo)
       throws IOException {
     Path appWorkDir = 
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, 
this.applicationName, this.applicationId);
+    Path containerJarsUnsharedDir = new Path(appWorkDir, 
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
+    Path jarCacheDir = this.jarCacheEnabled ? 
YarnHelixUtils.getJarPathCacheAndCleanIfNeeded(this.config, this.fs) : 
appWorkDir;
+    Path containerJarsCachedDir = new Path(jarCacheDir, 
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
+    LOGGER.info("Container cached jars root dir: " + containerJarsCachedDir);
+    LOGGER.info("Container uncached jars root dir: " + 
containerJarsUnsharedDir);
     Path containerWorkDir = new Path(appWorkDir, 
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
 
-    Map<String, LocalResource> resourceMap = Maps.newHashMap();
 
+    Map<String, LocalResource> resourceMap = Maps.newHashMap();
+    // Always fetch any jars from the appWorkDir for any potential snapshot 
jars
     addContainerLocalResources(new Path(appWorkDir, 
GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME), resourceMap);
-    addContainerLocalResources(new Path(containerWorkDir, 
GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME), resourceMap);
+    if 
(this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_FILES_LOCAL_KEY)) {
+      addContainerLocalResources(new Path(containerJarsUnsharedDir, 
GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME),
+          resourceMap);
+    }
+    if (this.jarCacheEnabled) {
+      addContainerLocalResources(new Path(jarCacheDir, 
GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME), resourceMap);
+      if 
(this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_FILES_LOCAL_KEY)) {

Review Comment:
   I don't quite understand this key.  you're checking it here in two different 
conditionals, but in neither one do you actually use (or even check to see) 
what value it holds



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java:
##########
@@ -720,26 +731,26 @@ private void addLibJars(Path srcLibJarDir, 
Optional<Map<String, LocalResource>>
     }
 
     for (FileStatus libJarFile : libJarFiles) {
-      Path destFilePath = new Path(destDir, libJarFile.getPath().getName());
-      this.fs.copyFromLocalFile(libJarFile.getPath(), destFilePath);
-      if (resourceMap.isPresent()) {
+      Path destFilePath = HdfsJarUploadUtils.calculateDestJarFile(this.fs, 
libJarFile, unsharedDir, destCacheDir);
+      if (HdfsJarUploadUtils.uploadJarToHdfs(fs, libJarFile, 
MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath) && resourceMap.isPresent()) 
{
         YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, 
LocalResourceType.FILE, resourceMap.get());
+      } else {
+        LOGGER.warn("Failed to upload jar file {} to HDFS", 
libJarFile.getPath());
       }
     }
   }
-
-  private void addAppJars(String jarFilePathList, Optional<Map<String, 
LocalResource>> resourceMap,
-      Path destDir, FileSystem localFs) throws IOException {
+  private void addAppJars(String jarFilePathList, Optional<Map<String, 
LocalResource>> resourceMap, Path destCacheDir, Path unsharedDir,
+      FileSystem localFs) throws IOException {
     for (String jarFilePath : SPLITTER.split(jarFilePathList)) {
       Path srcFilePath = new Path(jarFilePath);
-      Path destFilePath = new Path(destDir, srcFilePath.getName());
-      if (localFs.exists(srcFilePath)) {
-        this.fs.copyFromLocalFile(srcFilePath, destFilePath);
+      FileStatus localJar = localFs.getFileStatus(srcFilePath);
+      Path destFilePath = HdfsJarUploadUtils.calculateDestJarFile(this.fs, 
localJar, unsharedDir, destCacheDir);
+      if (HdfsJarUploadUtils.uploadJarToHdfs(fs, localJar, 
MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath)) {
+        if (resourceMap.isPresent()) {
+          YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, 
LocalResourceType.FILE, resourceMap.get());
+        }
       } else {
-        LOGGER.warn("The src destination " + srcFilePath + " doesn't exists");
-      }
-      if (resourceMap.isPresent()) {
-        YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, 
LocalResourceType.FILE, resourceMap.get());
+        LOGGER.warn("Failed to upload jar file {} to HDFS", srcFilePath);

Review Comment:
   again, should this be an actual failure, not merely logging?
   
   ...or do we believe there are times when it's actually OK to continue?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 933302)
    Time Spent: 20m  (was: 10m)

> Cache Yarn jars in GobblinYarnAppLauncher
> -----------------------------------------
>
>                 Key: GOBBLIN-2135
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2135
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: William Lo
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Gobblin YARN Application Launcher lacks some functionality used in 
> MRJobLauncher. One of the biggest gaps in feature parity is the absence of 
> jar caching, where MRJobLauncher creates a monthly cache that is 
> automatically cleaned up by subsequent executions performed 2 months in 
> advance.
> YARN/MR requires uploading jars to HDFS, this step can be quite slow (~15 
> mins for a sizeable job to get all the jars), and given that many jobs do 
> share the same jars, it makes sense to cache them together and only provide 
> YARN the shared path. 
> We also want to ensure that SNAPSHOT jars are other files are not uploaded to 
> a cache, since they are not immutable unlike jar versions on Artifactory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to