This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 22cfc5ceb [GOBBLIN-1785] add MR_JARS_BASE_DIR and logic to delete old 
mr jar dirs (#3642)
22cfc5ceb is described below

commit 22cfc5ceb2d89aeadf6626763c3e67a0b7d816f8
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Mon Apr 17 15:49:39 2023 -0700

    [GOBBLIN-1785] add MR_JARS_BASE_DIR and logic to delete old mr jar dirs 
(#3642)
    
    * add MR_JARS_BASE_DIR and logic to delete old mr jar dirs
    
    * add unit test
    
    * address review comments
---
 .../gobblin/configuration/ConfigurationKeys.java   |  4 ++++
 .../gobblin/runtime/mapreduce/MRJobLauncher.java   | 25 ++++++++++++++++++++--
 .../runtime/mapreduce/MRJobLauncherTest.java       | 20 +++++++++++++++++
 3 files changed, 47 insertions(+), 2 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index cb686d2d1..f0e15bf94 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -687,7 +687,11 @@ public class ConfigurationKeys {
    */
   public static final String MR_JOB_ROOT_DIR_KEY = "mr.job.root.dir";
   /** Specifies a static location in HDFS to upload jars to. Useful for 
sharing jars across different Gobblin runs.*/
+  @Deprecated // Deprecated; use MR_JARS_BASE_DIR instead
   public static final String MR_JARS_DIR = "mr.jars.dir";
+  // dir pointed by MR_JARS_BASE_DIR has month partitioned dirs to store jar 
files and are cleaned up on a regular basis
+  // retention feature is not available for dir pointed by MR_JARS_DIR
+  public static final String MR_JARS_BASE_DIR = "mr.jars.base.dir";
   public static final String MR_JOB_MAX_MAPPERS_KEY = "mr.job.max.mappers";
   public static final String MR_JOB_MAPPER_FAILURE_IS_FATAL_KEY = 
"mr.job.map.failure.is.fatal";
   public static final String MR_TARGET_MAPPER_SIZE = "mr.target.mapper.size";
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index 9f7c38360..bb89f9c6d 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -21,11 +21,14 @@ import java.io.DataOutputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.URI;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -228,8 +231,18 @@ public class MRJobLauncher extends AbstractJobLauncher {
       this.fs.delete(this.mrJobDir, true);
     }
     this.unsharedJarsDir = new Path(this.mrJobDir, JARS_DIR_NAME);
-    this.jarsDir = this.jobProps.containsKey(ConfigurationKeys.MR_JARS_DIR) ? 
new Path(
-        this.jobProps.getProperty(ConfigurationKeys.MR_JARS_DIR)) : 
this.unsharedJarsDir;
+
+    if (this.jobProps.containsKey(ConfigurationKeys.MR_JARS_BASE_DIR)) {
+      Path jarsBaseDir = new 
Path(this.jobProps.getProperty(ConfigurationKeys.MR_JARS_BASE_DIR));
+      String monthSuffix = new 
SimpleDateFormat("yyyy-MM").format(System.currentTimeMillis());
+      cleanUpOldJarsDirIfRequired(this.fs, jarsBaseDir);
+      this.jarsDir = new Path(jarsBaseDir, monthSuffix);
+    } else if (this.jobProps.containsKey(ConfigurationKeys.MR_JARS_DIR)) {
+      this.jarsDir = new 
Path(this.jobProps.getProperty(ConfigurationKeys.MR_JARS_DIR));
+    } else {
+      this.jarsDir = this.unsharedJarsDir;
+    }
+
     this.fs.mkdirs(this.mrJobDir);
 
     this.jobInputPath = new Path(this.mrJobDir, INPUT_DIR_NAME);
@@ -264,6 +277,14 @@ public class MRJobLauncher extends AbstractJobLauncher {
     startCancellationExecutor();
   }
 
+  static void cleanUpOldJarsDirIfRequired(FileSystem fs, Path jarsBaseDir) 
throws IOException {
+    List<FileStatus> jarDirs = Arrays.stream(fs.exists(jarsBaseDir)
+        ? fs.listStatus(jarsBaseDir) : new 
FileStatus[0]).sorted().collect(Collectors.toList());
+    if (jarDirs.size() > 2) {
+      fs.delete(jarDirs.get(0).getPath(), true);
+    }
+  }
+
   @Override
   public void close() throws IOException {
     try {
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
index 11d4df975..2a45ad30e 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
@@ -26,6 +26,9 @@ import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.jboss.byteman.contrib.bmunit.BMNGRunner;
 import org.jboss.byteman.contrib.bmunit.BMRule;
 import org.jboss.byteman.contrib.bmunit.BMRules;
@@ -37,6 +40,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
@@ -117,6 +121,22 @@ public class MRJobLauncherTest extends BMNGRunner {
     log.info("out");
   }
 
+  @Test
+  public void testCleanUpMrJarsBaseDir() throws Exception {
+    File tmpDirectory = Files.createTempDir();
+    tmpDirectory.deleteOnExit();
+    FileSystem fs = FileSystem.get(new Configuration());
+    String baseJarDir = tmpDirectory.getAbsolutePath();
+    fs.mkdirs(new Path(baseJarDir, "2023-01"));
+    fs.mkdirs(new Path(baseJarDir, "2022-12"));
+    fs.mkdirs(new Path(baseJarDir, "2023-02"));
+    MRJobLauncher.cleanUpOldJarsDirIfRequired(FileSystem.get(new 
Configuration()), new Path(tmpDirectory.getPath()));
+    Assert.assertFalse(fs.exists(new Path(baseJarDir, "2022-12")));
+    Assert.assertTrue(fs.exists(new Path(baseJarDir, "2023-01")));
+    Assert.assertTrue(fs.exists(new Path(baseJarDir, "2023-02")));
+    fs.delete(new Path(baseJarDir), true);
+  }
+
   @Test
   public void testNumOfWorkunits() throws Exception {
     Properties jobProps = loadJobProps();

Reply via email to