This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 22cfc5ceb [GOBBLIN-1785] add MR_JARS_BASE_DIR and logic to delete old
mr jar dirs (#3642)
22cfc5ceb is described below
commit 22cfc5ceb2d89aeadf6626763c3e67a0b7d816f8
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Mon Apr 17 15:49:39 2023 -0700
[GOBBLIN-1785] add MR_JARS_BASE_DIR and logic to delete old mr jar dirs
(#3642)
* add MR_JARS_BASE_DIR and logic to delete old mr jar dirs
* add unit test
* address review comments
---
.../gobblin/configuration/ConfigurationKeys.java | 4 ++++
.../gobblin/runtime/mapreduce/MRJobLauncher.java | 25 ++++++++++++++++++++--
.../runtime/mapreduce/MRJobLauncherTest.java | 20 +++++++++++++++++
3 files changed, 47 insertions(+), 2 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index cb686d2d1..f0e15bf94 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -687,7 +687,11 @@ public class ConfigurationKeys {
*/
public static final String MR_JOB_ROOT_DIR_KEY = "mr.job.root.dir";
/** Specifies a static location in HDFS to upload jars to. Useful for
sharing jars across different Gobblin runs.*/
+ @Deprecated // Deprecated; use MR_JARS_BASE_DIR instead
public static final String MR_JARS_DIR = "mr.jars.dir";
+ // dir pointed by MR_JARS_BASE_DIR has month partitioned dirs to store jar
files and are cleaned up on a regular basis
+ // retention feature is not available for dir pointed by MR_JARS_DIR
+ public static final String MR_JARS_BASE_DIR = "mr.jars.base.dir";
public static final String MR_JOB_MAX_MAPPERS_KEY = "mr.job.max.mappers";
public static final String MR_JOB_MAPPER_FAILURE_IS_FATAL_KEY =
"mr.job.map.failure.is.fatal";
public static final String MR_TARGET_MAPPER_SIZE = "mr.target.mapper.size";
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index 9f7c38360..bb89f9c6d 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -21,11 +21,14 @@ import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -228,8 +231,18 @@ public class MRJobLauncher extends AbstractJobLauncher {
this.fs.delete(this.mrJobDir, true);
}
this.unsharedJarsDir = new Path(this.mrJobDir, JARS_DIR_NAME);
- this.jarsDir = this.jobProps.containsKey(ConfigurationKeys.MR_JARS_DIR) ?
new Path(
- this.jobProps.getProperty(ConfigurationKeys.MR_JARS_DIR)) :
this.unsharedJarsDir;
+
+ if (this.jobProps.containsKey(ConfigurationKeys.MR_JARS_BASE_DIR)) {
+ Path jarsBaseDir = new
Path(this.jobProps.getProperty(ConfigurationKeys.MR_JARS_BASE_DIR));
+ String monthSuffix = new
SimpleDateFormat("yyyy-MM").format(System.currentTimeMillis());
+ cleanUpOldJarsDirIfRequired(this.fs, jarsBaseDir);
+ this.jarsDir = new Path(jarsBaseDir, monthSuffix);
+ } else if (this.jobProps.containsKey(ConfigurationKeys.MR_JARS_DIR)) {
+ this.jarsDir = new
Path(this.jobProps.getProperty(ConfigurationKeys.MR_JARS_DIR));
+ } else {
+ this.jarsDir = this.unsharedJarsDir;
+ }
+
this.fs.mkdirs(this.mrJobDir);
this.jobInputPath = new Path(this.mrJobDir, INPUT_DIR_NAME);
@@ -264,6 +277,14 @@ public class MRJobLauncher extends AbstractJobLauncher {
startCancellationExecutor();
}
+ static void cleanUpOldJarsDirIfRequired(FileSystem fs, Path jarsBaseDir)
throws IOException {
+ List<FileStatus> jarDirs = Arrays.stream(fs.exists(jarsBaseDir)
+ ? fs.listStatus(jarsBaseDir) : new
FileStatus[0]).sorted().collect(Collectors.toList());
+ if (jarDirs.size() > 2) {
+ fs.delete(jarDirs.get(0).getPath(), true);
+ }
+ }
+
@Override
public void close() throws IOException {
try {
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
index 11d4df975..2a45ad30e 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
@@ -26,6 +26,9 @@ import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.jboss.byteman.contrib.bmunit.BMNGRunner;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
@@ -37,6 +40,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -117,6 +121,22 @@ public class MRJobLauncherTest extends BMNGRunner {
log.info("out");
}
+ @Test
+ public void testCleanUpMrJarsBaseDir() throws Exception {
+ File tmpDirectory = Files.createTempDir();
+ tmpDirectory.deleteOnExit();
+ FileSystem fs = FileSystem.get(new Configuration());
+ String baseJarDir = tmpDirectory.getAbsolutePath();
+ fs.mkdirs(new Path(baseJarDir, "2023-01"));
+ fs.mkdirs(new Path(baseJarDir, "2022-12"));
+ fs.mkdirs(new Path(baseJarDir, "2023-02"));
+ MRJobLauncher.cleanUpOldJarsDirIfRequired(FileSystem.get(new
Configuration()), new Path(tmpDirectory.getPath()));
+ Assert.assertFalse(fs.exists(new Path(baseJarDir, "2022-12")));
+ Assert.assertTrue(fs.exists(new Path(baseJarDir, "2023-01")));
+ Assert.assertTrue(fs.exists(new Path(baseJarDir, "2023-02")));
+ fs.delete(new Path(baseJarDir), true);
+ }
+
@Test
public void testNumOfWorkunits() throws Exception {
Properties jobProps = loadJobProps();