This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a07a4f190d [GOBBLIN-2135] Cache Gobblin YARN application jars (#4030)
a07a4f190d is described below

commit a07a4f190d7128f48ba3d956efd4c1b9cf9158f4
Author: William Lo <[email protected]>
AuthorDate: Tue Sep 17 17:15:03 2024 -0400

    [GOBBLIN-2135] Cache Gobblin YARN application jars (#4030)
    
    * Implement caching for gobblin yarn app launcher so that jobs do not 
repeatedly upload jars and files to hdfs
---
 .../gobblin/runtime/mapreduce/MRJobLauncher.java   |  45 +-------
 .../apache/gobblin/temporal/yarn/YarnService.java  |  28 ++++-
 .../util/filesystem/HdfsJarUploadUtils.java        |  85 +++++++++++++++
 .../gobblin/yarn/GobblinYarnAppLauncher.java       | 117 +++++++++++++--------
 .../gobblin/yarn/GobblinYarnConfigurationKeys.java |   9 ++
 .../org/apache/gobblin/yarn/YarnHelixUtils.java    |  38 +++++++
 .../apache/gobblin/yarn/YarnHelixUtilsTest.java    |  48 +++++++++
 7 files changed, 279 insertions(+), 91 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index 7b01dc1a6b..b5e7f09740 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -114,6 +114,7 @@ import org.apache.gobblin.util.JobConfigurationUtils;
 import org.apache.gobblin.util.JobLauncherUtils;
 import org.apache.gobblin.util.ParallelRunner;
 import org.apache.gobblin.util.SerializationUtils;
+import org.apache.gobblin.util.filesystem.HdfsJarUploadUtils;
 import org.apache.gobblin.util.reflection.RestrictedFieldAccessingUtils;
 /**
  * An implementation of {@link JobLauncher} that launches a Gobblin job as a 
Hadoop MR job.
@@ -154,8 +155,6 @@ public class MRJobLauncher extends AbstractJobLauncher {
   // Configuration that make uploading of jar files more reliable,
   // since multiple Gobblin Jobs are sharing the same jar directory.
   private static final int MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT = 5;
-  private static final int WAITING_TIME_ON_IMCOMPLETE_UPLOAD = 3000;
-
   public static final String MR_TYPE_KEY = 
ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX + "mr.type";
   public static final String MAPPER_TASK_NUM_KEY = 
ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX + "reporting.mapper.task.num";
   public static final String MAPPER_TASK_ATTEMPT_NUM_KEY = 
ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX + 
"reporting.mapper.task.attempt.num";
@@ -572,56 +571,22 @@ public class MRJobLauncher extends AbstractJobLauncher {
     for (String jarFile : SPLITTER.split(jarFileList)) {
       Path srcJarFile = new Path(jarFile);
       FileStatus[] fileStatusList = lfs.globStatus(srcJarFile);
-
       for (FileStatus status : fileStatusList) {
+        Path destJarFile = HdfsJarUploadUtils.calculateDestJarFilePath(fs, 
status.getPath().getName(), this.unsharedJarsDir, jarFileDir);
         // For each FileStatus there are chances it could fail in copying at 
the first attempt, due to file-existence
         // or file-copy is ongoing by other job instance since all Gobblin 
jobs share the same jar file directory.
         // the retryCount is to avoid cases (if any) where retry is going too 
far and causes job hanging.
-        int retryCount = 0;
-        boolean shouldFileBeAddedIntoDC = true;
-        Path destJarFile = calculateDestJarFile(status, jarFileDir);
-        // Adding destJarFile into HDFS until it exists and the size of file 
on targetPath matches the one on local path.
-        while (!this.fs.exists(destJarFile) || 
fs.getFileStatus(destJarFile).getLen() != status.getLen()) {
-          try {
-            if (this.fs.exists(destJarFile) && 
fs.getFileStatus(destJarFile).getLen() != status.getLen()) {
-              Thread.sleep(WAITING_TIME_ON_IMCOMPLETE_UPLOAD);
-              throw new IOException("Waiting for file to complete on uploading 
... ");
-            }
-            // Set the first parameter as false for not deleting sourceFile
-            // Set the second parameter as false for not overwriting existing 
file on the target, by default it is true.
-            // If the file is preExisted but overwrite flag set to false, then 
an IOException if thrown.
-            this.fs.copyFromLocalFile(false, false, status.getPath(), 
destJarFile);
-          } catch (IOException | InterruptedException e) {
-            LOG.warn("Path:" + destJarFile + " is not copied successfully. 
Will require retry.");
-            retryCount += 1;
-            if (retryCount >= this.jarFileMaximumRetry) {
-              LOG.error("The jar file:" + destJarFile + "failed in being 
copied into hdfs", e);
-              // If retry reaches upper limit, skip copying this file.
-              shouldFileBeAddedIntoDC = false;
-              break;
-            }
-          }
-        }
-        if (shouldFileBeAddedIntoDC) {
+        if (HdfsJarUploadUtils.uploadJarToHdfs(this.fs, status, 
this.jarFileMaximumRetry, destJarFile)) {
           // Then add the jar file on HDFS to the classpath
           LOG.info(String.format("Adding %s to classpath", destJarFile));
           DistributedCache.addFileToClassPath(destJarFile, conf, this.fs);
+        } else {
+          LOG.error("Failed to upload jar file: " + status.getPath());
         }
       }
     }
   }
 
-  /**
-   * Calculate the target filePath of the jar file to be copied on HDFS,
-   * given the {@link FileStatus} of a jarFile and the path of directory that 
contains jar.
-   */
-  private Path calculateDestJarFile(FileStatus status, Path jarFileDir) {
-    // SNAPSHOT jars should not be shared, as different jobs may be using 
different versions of it
-    Path baseDir = status.getPath().getName().contains("SNAPSHOT") ? 
this.unsharedJarsDir : jarFileDir;
-    // DistributedCache requires absolute path, so we need to use 
makeQualified.
-    return new Path(this.fs.makeQualified(baseDir), 
status.getPath().getName());
-  }
-
   /**
    * Add local non-jar files the job depends on to DistributedCache.
    */
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
index b124063bcd..426521b921 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
@@ -197,6 +197,8 @@ class YarnService extends AbstractIdleService {
 
   private volatile boolean shutdownInProgress = false;
 
+  private final boolean jarCacheEnabled;
+
   public YarnService(Config config, String applicationName, String 
applicationId, YarnConfiguration yarnConfiguration,
       FileSystem fs, EventBus eventBus) throws Exception {
     this.applicationName = applicationName;
@@ -270,6 +272,7 @@ class YarnService extends AbstractIdleService {
         GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL);
     this.containerTimezone = ConfigUtils.getString(this.config, 
GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE,
         GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE);
+    this.jarCacheEnabled = ConfigUtils.getBoolean(this.config, 
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, 
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT);
   }
 
   @SuppressWarnings("unused")
@@ -484,12 +487,30 @@ class YarnService extends AbstractIdleService {
   protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo 
containerInfo)
       throws IOException {
     Path appWorkDir = 
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, 
this.applicationName, this.applicationId);
+    // Used for -SNAPSHOT versions of jars
+    Path containerJarsUnsharedDir = new Path(appWorkDir, 
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
+    Path jarCacheDir = this.jarCacheEnabled ? 
YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir;
+    Path containerJarsCachedDir = new Path(jarCacheDir, 
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
+    LOGGER.info("Container cached jars root dir: " + containerJarsCachedDir);
+    LOGGER.info("Container execution-private jars root dir: " + 
containerJarsUnsharedDir);
     Path containerWorkDir = new Path(appWorkDir, 
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
 
-    Map<String, LocalResource> resourceMap = Maps.newHashMap();
 
+    Map<String, LocalResource> resourceMap = Maps.newHashMap();
+    // Always fetch any jars from the appWorkDir for any potential snapshot 
jars
     addContainerLocalResources(new Path(appWorkDir, 
GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME), resourceMap);
-    addContainerLocalResources(new Path(containerWorkDir, 
GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME), resourceMap);
+    if (this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JARS_KEY)) {
+      addContainerLocalResources(new Path(containerJarsUnsharedDir, 
GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME),
+          resourceMap);
+    }
+    if (this.jarCacheEnabled) {
+      addContainerLocalResources(new Path(jarCacheDir, 
GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME), resourceMap);
+      if 
(this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JARS_KEY)) {
+        addContainerLocalResources(new Path(containerJarsCachedDir, 
GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME),
+            resourceMap);
+      }
+    }
+
     addContainerLocalResources(
         new Path(containerWorkDir, 
GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME), resourceMap);
 
@@ -579,8 +600,6 @@ class YarnService extends AbstractIdleService {
       containerCommand.append(" 
--").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME)
           .append(" ").append(helixInstanceTag);
     }
-
-    LOGGER.info("Building " + containerProcessName);
     return containerCommand.append(" 
1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
             
containerProcessName).append(".").append(ApplicationConstants.STDOUT)
         .append(" 
2>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
@@ -797,7 +816,6 @@ class YarnService extends AbstractIdleService {
           public void run() {
             try {
               LOGGER.info("Starting container " + containerId);
-
               nmClientAsync.startContainerAsync(container, 
newContainerLaunchContext(containerInfo));
             } catch (IOException ioe) {
               LOGGER.error("Failed to start container " + containerId, ioe);
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/HdfsJarUploadUtils.java
 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/HdfsJarUploadUtils.java
new file mode 100644
index 0000000000..31128a1cf1
--- /dev/null
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/HdfsJarUploadUtils.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.filesystem;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Utility class for uploading jar files to HDFS with retries to handle 
concurrency
+ */
+@Slf4j
+public class HdfsJarUploadUtils {
+
+  private static final long WAITING_TIME_ON_INCOMPLETE_UPLOAD_MILLIS = 3000;
+
+  /**
+   * Calculate the target filePath of the jar file to be copied on HDFS,
+   * given the {@link FileStatus} of a jarFile and the path of directory that 
contains jar.
+   * Snapshot dirs should not be shared, as different jobs may be using 
different versions of it.
+   * @param fs
+   * @param jarName
+   * @param unsharedJarsDir
+   * @param jarCacheDir
+   * @return
+   * @throws IOException
+   */
+  public static Path calculateDestJarFilePath(FileSystem fs, String jarName, 
Path unsharedJarsDir, Path jarCacheDir) throws IOException {
+    Path uploadDir = jarName.contains("SNAPSHOT") ? unsharedJarsDir : 
jarCacheDir;
+    Path destJarFile = new Path(fs.makeQualified(uploadDir), jarName);
+    return destJarFile;
+  }
+  /**
+   * Upload a jar file to HDFS with retries to handle already existing jars
+   * @param fs
+   * @param localJar
+   * @param destJarFile
+   * @param maxAttempts
+   * @return
+   * @throws IOException
+   */
+  public static boolean uploadJarToHdfs(FileSystem fs, FileStatus localJar, 
int maxAttempts, Path destJarFile) throws IOException {
+    int retryCount = 0;
+    while (!fs.exists(destJarFile) || fs.getFileStatus(destJarFile).getLen() 
!= localJar.getLen()) {
+      try {
+        if (fs.exists(destJarFile) && fs.getFileStatus(destJarFile).getLen() 
!= localJar.getLen()) {
+          Thread.sleep(WAITING_TIME_ON_INCOMPLETE_UPLOAD_MILLIS);
+          throw new IOException("Waiting for file to complete on uploading ... 
");
+        }
+        boolean deleteSourceFile = false;
+        boolean overwriteAnyExistingDestFile = false; // IOException will be 
thrown if does already exist
+        fs.copyFromLocalFile(deleteSourceFile, overwriteAnyExistingDestFile, 
localJar.getPath(), destJarFile);
+      } catch (IOException | InterruptedException e) {
+        log.warn("Path:" + destJarFile + " is not copied successfully. Will 
require retry.");
+        retryCount += 1;
+        if (retryCount >= maxAttempts) {
+          log.error("The jar file:" + destJarFile + "failed in being copied 
into hdfs", e);
+          // If retry reaches upper limit, skip copying this file.
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+}
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index c7dd6e85c1..4ce40f21df 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -108,6 +108,7 @@ import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.EmailUtils;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.filesystem.HdfsJarUploadUtils;
 import org.apache.gobblin.util.hadoop.TokenUtils;
 import org.apache.gobblin.util.io.StreamUtils;
 import org.apache.gobblin.util.logs.LogCopier;
@@ -162,7 +163,7 @@ public class GobblinYarnAppLauncher {
   public static final String GOBBLIN_YARN_CONFIG_OUTPUT_PATH = 
"gobblin.yarn.configOutputPath";
 
   //Configuration key to signal the GobblinYarnAppLauncher mode
-  public static final String GOBBLIN_YARN_APP_LAUNCHER_MODE =  
"gobblin.yarn.appLauncherMode";
+  public static final String GOBBLIN_YARN_APP_LAUNCHER_MODE = 
"gobblin.yarn.appLauncherMode";
   public static final String DEFAULT_GOBBLIN_YARN_APP_LAUNCHER_MODE = "";
   public static final String AZKABAN_APP_LAUNCHER_MODE_KEY = "azkaban";
 
@@ -238,9 +239,13 @@ public class GobblinYarnAppLauncher {
   protected final String originalYarnRMAddress;
   protected final Map<String, YarnClient> potentialYarnClients = new 
HashMap<>();
   private YarnClient yarnClient;
+  private static final int MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT = 5;
+
+  private final boolean jarCacheEnabled;
 
   public GobblinYarnAppLauncher(Config config, YarnConfiguration 
yarnConfiguration) throws IOException {
-    this.config = config;
+    this.config = 
config.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
+        ConfigValueFactory.fromAnyRef(System.currentTimeMillis()));
 
     this.applicationName = 
config.getString(GobblinYarnConfigurationKeys.APPLICATION_NAME_KEY);
     this.appQueueName = 
config.getString(GobblinYarnConfigurationKeys.APP_QUEUE_KEY);
@@ -254,7 +259,6 @@ public class GobblinYarnAppLauncher {
     this.fs = GobblinClusterUtils.buildFileSystem(config, 
this.yarnConfiguration);
     this.closer.register(this.fs);
 
-
     boolean isHelixEnabled = ConfigUtils.getBoolean(config, 
GobblinYarnConfigurationKeys.HELIX_ENABLED,
         GobblinYarnConfigurationKeys.DEFAULT_HELIX_ENABLED);
     this.helixClusterLifecycleManager = isHelixEnabled
@@ -300,10 +304,10 @@ public class GobblinYarnAppLauncher {
         GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE);
 
     this.detachOnExitEnabled = ConfigUtils
-        .getBoolean(config, 
GobblinYarnConfigurationKeys.GOBBLIN_YARN_DETACH_ON_EXIT_ENABLED,
+        .getBoolean(this.config, 
GobblinYarnConfigurationKeys.GOBBLIN_YARN_DETACH_ON_EXIT_ENABLED,
             GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_DETACH_ON_EXIT);
-    this.appLauncherMode = ConfigUtils.getString(config, 
GOBBLIN_YARN_APP_LAUNCHER_MODE, DEFAULT_GOBBLIN_YARN_APP_LAUNCHER_MODE);
-
+    this.appLauncherMode = ConfigUtils.getString(this.config, 
GOBBLIN_YARN_APP_LAUNCHER_MODE, DEFAULT_GOBBLIN_YARN_APP_LAUNCHER_MODE);
+    this.jarCacheEnabled = ConfigUtils.getBoolean(this.config, 
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, 
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT);
 
     try {
       config = addDynamicConfig(config);
@@ -334,7 +338,7 @@ public class GobblinYarnAppLauncher {
     this.eventBus.register(this);
 
     //Before connect with yarn client, we need to login to get the token
-    if(ConfigUtils.getBoolean(config, 
GobblinYarnConfigurationKeys.ENABLE_KEY_MANAGEMENT, false)) {
+    if (ConfigUtils.getBoolean(config, 
GobblinYarnConfigurationKeys.ENABLE_KEY_MANAGEMENT, false)) {
       this.tokenRefreshManager = Optional.of(buildTokenRefreshManager());
       this.tokenRefreshManager.get().loginAndScheduleTokenRenewal();
     }
@@ -370,7 +374,7 @@ public class GobblinYarnAppLauncher {
     return this.applicationId.isPresent() && !this.applicationCompleted && 
!this.detachOnExitEnabled;
   }
 
-  private void addServices() throws IOException{
+  private void addServices() throws IOException {
     List<Service> services = Lists.newArrayList();
     if (this.tokenRefreshManager.isPresent()) {
       LOGGER.info("Adding KeyManagerService since key management is enabled");
@@ -396,7 +400,7 @@ public class GobblinYarnAppLauncher {
       LOGGER.warn("NOT starting the admin UI because the job execution info 
server is NOT enabled");
     }
 
-    if (services.size() > 0 ) {
+    if (services.size() > 0) {
       this.serviceManager = Optional.of(new ServiceManager(services));
       this.serviceManager.get().startAsync();
     } else {
@@ -428,7 +432,6 @@ public class GobblinYarnAppLauncher {
       if (!this.detachOnExitEnabled) {
         LOGGER.info("Disabling all live Helix instances..");
       }
-
     } finally {
       try {
         if (this.applicationId.isPresent() && !this.detachOnExitEnabled) {
@@ -561,7 +564,7 @@ public class GobblinYarnAppLauncher {
 
   /**
    * Setup and submit the Gobblin Yarn application.
-   *
+   * Retain at least the current and last month's jars in the cache (if 
configured) to handle executions running for ~30 days max
    * @throws IOException if there's anything wrong setting up and submitting 
the Yarn application
    * @throws YarnException if there's anything wrong setting up and submitting 
the Yarn application
    */
@@ -573,7 +576,7 @@ public class GobblinYarnAppLauncher {
     appSubmissionContext.setApplicationType(GOBBLIN_YARN_APPLICATION_TYPE);
     appSubmissionContext.setMaxAppAttempts(ConfigUtils.getInt(config, 
GobblinYarnConfigurationKeys.APP_MASTER_MAX_ATTEMPTS_KEY, 
GobblinYarnConfigurationKeys.DEFAULT_APP_MASTER_MAX_ATTEMPTS_KEY));
     ApplicationId applicationId = appSubmissionContext.getApplicationId();
-    LOGGER.info("created new yarn application: "+ applicationId.getId());
+    LOGGER.info("created new yarn application: " + applicationId.getId());
 
     GetNewApplicationResponse newApplicationResponse = 
gobblinYarnApp.getNewApplicationResponse();
     // Set up resource type requirements for ApplicationMaster
@@ -587,6 +590,15 @@ public class GobblinYarnAppLauncher {
     
amContainerLaunchContext.setEnvironment(YarnHelixUtils.getEnvironmentVariables(this.yarnConfiguration));
     
amContainerLaunchContext.setCommands(Lists.newArrayList(buildApplicationMasterCommand(applicationId.toString(),
 resource.getMemory())));
 
+    if (this.jarCacheEnabled) {
+      Path jarCachePath = 
YarnHelixUtils.calculatePerMonthJarCachePath(this.config);
+      // Retain at least the current and last month's jars to handle 
executions running for ~30 days max
+      boolean cleanedSuccessfully = 
YarnHelixUtils.retainKLatestJarCachePaths(jarCachePath.getParent(), 2, this.fs);
+      if (!cleanedSuccessfully) {
+        LOGGER.warn("Failed to delete older jar cache directories");
+      }
+    }
+
     Map<ApplicationAccessType, String> acls = new HashMap<>(1);
     acls.put(ApplicationAccessType.VIEW_APP, this.appViewAcl);
     amContainerLaunchContext.setApplicationACLs(acls);
@@ -640,31 +652,36 @@ public class GobblinYarnAppLauncher {
 
   private Map<String, LocalResource> addAppMasterLocalResources(ApplicationId 
applicationId) throws IOException {
     Path appWorkDir = 
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, 
this.applicationName, applicationId.toString());
+    Path jarsRootDir = this.jarCacheEnabled ? 
YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir;
 
     Path appMasterWorkDir = new Path(appWorkDir, 
GobblinYarnConfigurationKeys.APP_MASTER_WORK_DIR_NAME);
-    LOGGER.info("Configured GobblinApplicationMaster work directory to: {}", 
appMasterWorkDir.toString());
+    Path appMasterJarsCacheDir = new Path(jarsRootDir, 
GobblinYarnConfigurationKeys.APP_MASTER_WORK_DIR_NAME);
+    LOGGER.info("Configured GobblinApplicationMaster work directory to: {}", 
appMasterWorkDir);
+    LOGGER.info("Configured GobblinApplicationMaster jars directory to: {}", 
appMasterJarsCacheDir);
 
     Map<String, LocalResource> appMasterResources = Maps.newHashMap();
     FileSystem localFs = FileSystem.getLocal(new Configuration());
 
-    // NOTE: log after each step below for insight into what takes bulk of time
     if (this.config.hasPath(GobblinYarnConfigurationKeys.LIB_JARS_DIR_KEY)) {
-      Path libJarsDestDir = new Path(appWorkDir, 
GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME);
+      // Lib jars are shared between all containers, store at the root level
+      Path libJarsDestDir = new Path(jarsRootDir, 
GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME);
+      Path unsharedJarsDestDir = new Path(appWorkDir, 
GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME);
       addLibJars(new 
Path(this.config.getString(GobblinYarnConfigurationKeys.LIB_JARS_DIR_KEY)),
-          Optional.of(appMasterResources), libJarsDestDir, localFs);
-      LOGGER.info("Added lib jars to directory: {}", 
libJarsDestDir.toString());
+          Optional.of(appMasterResources), libJarsDestDir, 
unsharedJarsDestDir, localFs);
+      LOGGER.info("Added lib jars to directory: {} and execution-private 
directory: {}", libJarsDestDir, unsharedJarsDestDir);
     }
     if (this.config.hasPath(GobblinYarnConfigurationKeys.APP_MASTER_JARS_KEY)) 
{
-      Path appJarsDestDir = new Path(appMasterWorkDir, 
GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME);
+      Path appJarsDestDir = new Path(appMasterJarsCacheDir, 
GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME);
+      Path unsharedJarsDestDir = new Path(appMasterWorkDir, 
GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME);
       
addAppJars(this.config.getString(GobblinYarnConfigurationKeys.APP_MASTER_JARS_KEY),
-          Optional.of(appMasterResources), appJarsDestDir, localFs);
-      LOGGER.info("Added app jars to directory: {}", 
appJarsDestDir.toString());
+          Optional.of(appMasterResources), appJarsDestDir, 
unsharedJarsDestDir, localFs);
+      LOGGER.info("Added app jars to directory: {} and execution-private 
directory: {}", appJarsDestDir, unsharedJarsDestDir);
     }
     if 
(this.config.hasPath(GobblinYarnConfigurationKeys.APP_MASTER_FILES_LOCAL_KEY)) {
-      Path appFilesDestDir = new Path(appMasterWorkDir, 
GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME);
+      Path appFilesDestDir = new Path(appWorkDir, 
GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME);
       
addAppLocalFiles(this.config.getString(GobblinYarnConfigurationKeys.APP_MASTER_FILES_LOCAL_KEY),
           Optional.of(appMasterResources), appFilesDestDir, localFs);
-      LOGGER.info("Added app local files to directory: {}", 
appFilesDestDir.toString());
+      LOGGER.info("Added app local files to directory: {}", appFilesDestDir);
     }
     if 
(this.config.hasPath(GobblinYarnConfigurationKeys.APP_MASTER_FILES_REMOTE_KEY)) 
{
       
YarnHelixUtils.addRemoteFilesToLocalResources(this.config.getString(GobblinYarnConfigurationKeys.APP_MASTER_FILES_REMOTE_KEY),
@@ -677,27 +694,30 @@ public class GobblinYarnAppLauncher {
       LOGGER.info("Added remote zips to local resources");
     }
     if 
(this.config.hasPath(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY)) {
-      Path appFilesDestDir = new Path(appMasterWorkDir, 
GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME);
+      Path appFilesDestDir = new Path(appWorkDir, 
GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME);
       
addJobConfPackage(this.config.getString(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY),
 appFilesDestDir,
           appMasterResources);
-      LOGGER.info("Added job conf package to directory: {}", 
appFilesDestDir.toString());
+      LOGGER.info("Added job conf package to directory: {}", appFilesDestDir);
     }
 
     return appMasterResources;
   }
 
   private void addContainerLocalResources(ApplicationId applicationId) throws 
IOException {
-    Path appWorkDir = 
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, 
this.applicationName, applicationId.toString());
-
+    Path appWorkDir = 
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, 
this.applicationName,
+        applicationId.toString());
+    Path jarsRootDir = this.jarCacheEnabled ? 
YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir;
     Path containerWorkDir = new Path(appWorkDir, 
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
-    LOGGER.info("Configured Container work directory to: {}", 
containerWorkDir.toString());
-
+    Path containerJarsRootDir = new Path(jarsRootDir, 
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
+    LOGGER.info("Configured Container work directory to: {}", 
containerWorkDir);
+    LOGGER.info("Configured Container jars directory to: {}", 
containerJarsRootDir);
     FileSystem localFs = FileSystem.getLocal(new Configuration());
 
     if (this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JARS_KEY)) {
-      Path appJarsDestDir = new Path(containerWorkDir, 
GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME);
+      Path appJarsDestDir = new Path(containerJarsRootDir, 
GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME);
+      Path unsharedJarsDestDir = new Path(containerWorkDir, 
GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME);
       
addAppJars(this.config.getString(GobblinYarnConfigurationKeys.CONTAINER_JARS_KEY),
-          Optional.<Map<String, LocalResource>>absent(), appJarsDestDir, 
localFs);
+          Optional.<Map<String, LocalResource>>absent(), appJarsDestDir, 
unsharedJarsDestDir, localFs);
     }
     if 
(this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_FILES_LOCAL_KEY)) {
       Path appFilesDestDir = new Path(containerWorkDir, 
GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME);
@@ -706,7 +726,7 @@ public class GobblinYarnAppLauncher {
     }
   }
 
-  private void addLibJars(Path srcLibJarDir, Optional<Map<String, 
LocalResource>> resourceMap, Path destDir,
+  private void addLibJars(Path srcLibJarDir, Optional<Map<String, 
LocalResource>> resourceMap, Path destCacheDir, Path unsharedDir,
       FileSystem localFs) throws IOException {
 
     // Missing classpath-jars will be a fatal error.
@@ -720,26 +740,26 @@ public class GobblinYarnAppLauncher {
     }
 
     for (FileStatus libJarFile : libJarFiles) {
-      Path destFilePath = new Path(destDir, libJarFile.getPath().getName());
-      this.fs.copyFromLocalFile(libJarFile.getPath(), destFilePath);
-      if (resourceMap.isPresent()) {
+      Path destFilePath = HdfsJarUploadUtils.calculateDestJarFilePath(this.fs, 
libJarFile.getPath().getName(), unsharedDir, destCacheDir);
+      if (HdfsJarUploadUtils.uploadJarToHdfs(fs, libJarFile, 
MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath) && resourceMap.isPresent()) 
{
         YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, 
LocalResourceType.FILE, resourceMap.get());
+      } else {
+        LOGGER.warn("Failed to upload jar file {} to HDFS", 
libJarFile.getPath());
       }
     }
   }
-
-  private void addAppJars(String jarFilePathList, Optional<Map<String, 
LocalResource>> resourceMap,
-      Path destDir, FileSystem localFs) throws IOException {
+  private void addAppJars(String jarFilePathList, Optional<Map<String, 
LocalResource>> resourceMap, Path destCacheDir, Path unsharedDir,
+      FileSystem localFs) throws IOException {
     for (String jarFilePath : SPLITTER.split(jarFilePathList)) {
       Path srcFilePath = new Path(jarFilePath);
-      Path destFilePath = new Path(destDir, srcFilePath.getName());
-      if (localFs.exists(srcFilePath)) {
-        this.fs.copyFromLocalFile(srcFilePath, destFilePath);
+      FileStatus localJar = localFs.getFileStatus(srcFilePath);
+      Path destFilePath = HdfsJarUploadUtils.calculateDestJarFilePath(this.fs, 
localJar.getPath().getName(), unsharedDir, destCacheDir);
+      if (HdfsJarUploadUtils.uploadJarToHdfs(fs, localJar, 
MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath)) {
+        if (resourceMap.isPresent()) {
+          YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, 
LocalResourceType.FILE, resourceMap.get());
+        }
       } else {
-        LOGGER.warn("The src destination " + srcFilePath + " doesn't exists");
-      }
-      if (resourceMap.isPresent()) {
-        YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, 
LocalResourceType.FILE, resourceMap.get());
+        LOGGER.warn("Failed to upload jar file {} to HDFS", srcFilePath);
       }
     }
   }
@@ -751,7 +771,11 @@ public class GobblinYarnAppLauncher {
       Path srcFilePath = new Path(localFilePath);
       Path destFilePath = new Path(destDir, srcFilePath.getName());
       if (localFs.exists(srcFilePath)) {
-        this.fs.copyFromLocalFile(srcFilePath, destFilePath);
+        if (this.fs.exists(destFilePath)) {
+          LOGGER.info("The destination app jar {} already exists, skipping 
upload", destFilePath);
+        } else {
+          this.fs.copyFromLocalFile(srcFilePath, destFilePath);
+        }
         if (resourceMap.isPresent()) {
           YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, 
LocalResourceType.FILE, resourceMap.get());
         }
@@ -789,6 +813,7 @@ public class GobblinYarnAppLauncher {
         .append(" 
-D").append(GobblinYarnConfigurationKeys.JVM_USER_TIMEZONE_CONFIG).append("=").append(this.containerTimezone)
         .append(" 
-D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_DIR_NAME).append("=").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR)
         .append(" 
-D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_FILE_NAME).append("=").append(logFileName).append(".").append(ApplicationConstants.STDOUT)
+        .append(" 
-D").append(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY).append("=").append(config.getString(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY))
         .append(" ").append(JvmUtils.formatJvmArguments(this.appMasterJvmArgs))
         .append(" ").append(appMasterClass.getName())
         .append(" 
--").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)
@@ -822,7 +847,7 @@ public class GobblinYarnAppLauncher {
     // Only pass token here and no secrets. (since there is no simple way to 
remove single token/ get secrets)
     // For RM token, only pass the RM token for the current RM, or the RM will 
fail to update the token
     Credentials finalCredentials = new Credentials();
-    for (Token<? extends TokenIdentifier> token: credentials.getAllTokens()) {
+    for (Token<? extends TokenIdentifier> token : credentials.getAllTokens()) {
       if (token.getKind().equals(new Text("RM_DELEGATION_TOKEN")) && 
!token.getService().equals(new Text(this.originalYarnRMAddress))) {
         continue;
       }
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
index 1a5bc96c86..93cc23dba6 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
@@ -49,6 +49,15 @@ public class GobblinYarnConfigurationKeys {
   public static final String YARN_RESOURCE_MANAGER_IDS = 
YARN_RESOURCE_MANAGER_PREFIX + "ids";
   public static final String OTHER_YARN_RESOURCE_MANAGER_ADDRESSES= 
"other.yarn.resourcemanager.addresses";
 
+  public static final String JAR_CACHE_ENABLED = GOBBLIN_YARN_PREFIX + 
"jar.cache.enabled";
+
+  public static final boolean JAR_CACHE_ENABLED_DEFAULT = false;
+
+  public static final String JAR_CACHE_DIR = GOBBLIN_YARN_PREFIX + 
"jar.cache.dir";
+
+  // Used to store the start time of the app launcher to propagate to workers 
and appmaster
+  public static final String YARN_APPLICATION_LAUNCHER_START_TIME_KEY = 
GOBBLIN_YARN_PREFIX + "application.start.time";
+
   // Gobblin Yarn ApplicationMaster configuration properties.
   public static final String APP_MASTER_MEMORY_MBS_KEY = GOBBLIN_YARN_PREFIX + 
"app.master.memory.mbs";
   public static final String APP_MASTER_CORES_KEY = GOBBLIN_YARN_PREFIX + 
"app.master.cores";
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
index 72f9cc3363..4a74da44a5 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
@@ -20,10 +20,14 @@ package org.apache.gobblin.yarn;
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -199,6 +203,40 @@ public class YarnHelixUtils {
     }
   }
 
+  /**
+   * Calculate the path of a jar cache on HDFS, which is retained on a monthly 
basis.
+   * Should be used in conjunction with {@link 
#retainKLatestJarCachePaths(Path, int, FileSystem)}. to clean up the cache on a 
periodic basis
+   * @param config
+   * @return
+   * @throws IOException
+   */
+  public static Path calculatePerMonthJarCachePath(Config config) throws 
IOException {
+    Path jarsCacheDirMonthly = new 
Path(config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR));
+    String monthSuffix = new 
SimpleDateFormat("yyyy-MM").format(config.getLong(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY));
+    return new Path(jarsCacheDirMonthly, monthSuffix);
+
+  }
+
+  /**
+   * Retain the latest k jar cache paths that are children of the parent cache 
path.
+   * @param parentCachePath
+   * @param k the number of latest jar cache paths to retain
+   * @param fs
+   * @return
+   * @throws IllegalAccessException
+   * @throws IOException
+   */
+  public static boolean retainKLatestJarCachePaths(Path parentCachePath, int 
k, FileSystem fs) throws IOException {
+    // Cleanup old cache if necessary
+    List<FileStatus> jarDirs =
+        Arrays.stream(fs.exists(parentCachePath) ? 
fs.listStatus(parentCachePath) : new 
FileStatus[0]).sorted().collect(Collectors.toList());
+    boolean deletesSuccessful = true;
+    for (int i = 0; i < jarDirs.size() - k; i++) {
+      deletesSuccessful &= fs.delete(jarDirs.get(i).getPath(), true);
+    }
+    return deletesSuccessful;
+  }
+
   public static void addRemoteFilesToLocalResources(String hdfsFileList, 
Map<String, LocalResource> resourceMap, Configuration yarnConfiguration) throws 
IOException {
     for (String hdfsFilePath : SPLITTER.split(hdfsFileList)) {
       Path srcFilePath = new Path(hdfsFilePath);
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java
index 033a24aa90..c271258c1d 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java
@@ -18,14 +18,22 @@ package org.apache.gobblin.yarn;
 
 import java.io.IOException;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.junit.Assert;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.Test;
 
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
 
 public class YarnHelixUtilsTest {
   /**
@@ -33,6 +41,14 @@ public class YarnHelixUtilsTest {
    * added to the resources folder.
    * @throws IOException
    */
+  String tempDir = Files.createTempDir().getPath();
+
+  @AfterClass
+  public void tearDown() throws IOException{
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    fs.delete(new Path(this.tempDir), true);
+  }
+
   @Test
   public void testUpdateToken()
       throws IOException {
@@ -51,4 +67,36 @@ public class YarnHelixUtilsTest {
     Token<?> readToken = credentials.getToken(new Text("testService"));
     Assert.assertNotNull(readToken);
   }
+
+  @Test
+  public void testGetJarCachePath() throws IOException {
+    Config config = ConfigFactory.empty()
+        
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
 ConfigValueFactory.fromAnyRef(1726074000013L))
+        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, 
ConfigValueFactory.fromAnyRef("/tmp"));
+    Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(config);
+
+    Assert.assertEquals(jarCachePath, new Path("/tmp/2024-09"));
+  }
+
+  @Test
+  public void retainLatestKJarCachePaths() throws IOException {
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    Config config = ConfigFactory.empty()
+        
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
 ConfigValueFactory.fromAnyRef(1726074000013L))
+        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, 
ConfigValueFactory.fromAnyRef(this.tempDir + "/tmp"));
+    Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(config);
+    fs.mkdirs(jarCachePath);
+    fs.mkdirs(new Path(jarCachePath.getParent(), "2024-08"));
+    fs.mkdirs(new Path(jarCachePath.getParent(), "2024-07"));
+    fs.mkdirs(new Path(jarCachePath.getParent(), "2024-06"));
+
+    YarnHelixUtils.retainKLatestJarCachePaths(jarCachePath.getParent(), 2, fs);
+
+    Assert.assertTrue(fs.exists(new Path(this.tempDir, "tmp/2024-09")));
+    Assert.assertTrue(fs.exists(new Path(this.tempDir, "tmp/2024-08")));
+    // Should be cleaned up
+    Assert.assertFalse(fs.exists(new Path(this.tempDir, "tmp/2024-07")));
+    Assert.assertFalse(fs.exists(new Path(this.tempDir, "tmp/2024-06")));
+
+  }
 }
\ No newline at end of file


Reply via email to