This is an automated email from the ASF dual-hosted git repository.
abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new d445b1e2fe Update Jar cache directory partitioning bi-monthly from
monthly (#4165)
d445b1e2fe is described below
commit d445b1e2fecf014662c335f90c3a937f967016d1
Author: Agam Pal Singh <[email protected]>
AuthorDate: Wed Feb 18 20:18:27 2026 +0530
Update Jar cache directory partitioning bi-monthly from monthly (#4165)
* updated cache directory partitioning to bi-monthly
---
.../gobblin/yarn/GobblinYarnAppLauncher.java | 2 +-
.../org/apache/gobblin/yarn/YarnHelixUtils.java | 91 ++++++++--
.../apache/gobblin/yarn/YarnHelixUtilsTest.java | 186 ++++++++++++++++++++-
3 files changed, 252 insertions(+), 27 deletions(-)
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 4311e4ad77..019410957c 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -609,7 +609,7 @@ public class GobblinYarnAppLauncher {
if (this.jarCacheEnabled) {
Path jarCachePath =
YarnHelixUtils.calculatePerMonthJarCachePath(this.config, this.fs);
- // Retain at least the current and last month's jars to handle
executions running for ~30 days max
+ // Retain 2 semi-monthly periods (current + 1 previous) to handle
executions running in both periods
boolean cleanedSuccessfully =
YarnHelixUtils.retainKLatestJarCachePaths(jarCachePath.getParent(), 2, this.fs);
if (!cleanedSuccessfully) {
LOGGER.warn("Failed to delete older jar cache directories");
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
index 1a4113dbee..03aa18f0ae 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.net.URL;
import java.text.SimpleDateFormat;
import java.util.Arrays;
+import java.util.Calendar;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
@@ -52,6 +53,7 @@ import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -207,43 +209,98 @@ public class YarnHelixUtils {
}
/**
- * Calculate the path of a jar cache on HDFS, which is retained on a monthly
basis.
- * Should be used in conjunction with {@link
#retainKLatestJarCachePaths(Path, int, FileSystem)}. to clean up the cache on a
periodic basis
+ * Calculate the path of a jar cache on HDFS, which is retained on a
semi-monthly basis (twice per month).
+ * Each month is split into two periods:
+ * - Period 1: Days 1-15 (suffix: yyyy-MM.1)
+ * - Period 2: Days 16-end (suffix: yyyy-MM.2)
+ *
+ * Should be used in conjunction with {@link
#retainKLatestJarCachePaths(Path, int, FileSystem)} to clean up the cache on a
periodic basis.
+ *
* @param config the configuration
* @param fs the filesystem to use for validation
- * @return the monthly jar cache path
+ * @return the semi-monthly jar cache path
* @throws IOException if filesystem operations fail
*/
public static Path calculatePerMonthJarCachePath(Config config, FileSystem
fs) throws IOException {
// Use JarCachePathResolver to resolve the base jar cache directory
Path baseCacheDir = JarCachePathResolver.resolveJarCachePath(config, fs);
-
- // Append monthly suffix
- String monthSuffix = new SimpleDateFormat("yyyy-MM").format(
-
config.getLong(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY));
- return new Path(baseCacheDir, monthSuffix);
+
+ // Calculate semi-monthly suffix based on day of month
+ long startTime =
config.getLong(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY);
+ Calendar cal = java.util.Calendar.getInstance();
+ cal.setTimeInMillis(startTime);
+
+ String yearMonth = new SimpleDateFormat("yyyy-MM").format(cal.getTime());
+ int dayOfMonth = cal.get(java.util.Calendar.DAY_OF_MONTH);
+
+ // Partition by 15th of month: days 1-15 = period 1, days 16-end = period 2
+ String periodSuffix = dayOfMonth <= 15 ? "1" : "2";
+ String cacheSuffix = yearMonth + "." + periodSuffix;
+
+ return new Path(baseCacheDir, cacheSuffix);
}
/**
* Retain the latest k jar cache paths that are children of the parent cache
path.
- * @param parentCachePath
+ * Handles both old monthly format (yyyy-MM) and new semi-monthly format
(yyyy-MM.1, yyyy-MM.2).
+ * During migration, old monthly directories are treated as belonging to the
first half of the month
+ * for sorting purposes (equivalent to yyyy-MM.1), ensuring they are cleaned
up before newer semi-monthly periods.
+ *
+ * @param parentCachePath the parent directory containing jar cache
subdirectories
* @param k the number of latest jar cache paths to retain
- * @param fs
- * @return
- * @throws IllegalAccessException
- * @throws IOException
+ * @param fs the filesystem
+ * @return true if all deletes were successful, false otherwise
+ * @throws IOException if filesystem operations fail
*/
public static boolean retainKLatestJarCachePaths(Path parentCachePath, int
k, FileSystem fs) throws IOException {
- // Cleanup old cache if necessary
- List<FileStatus> jarDirs =
- Arrays.stream(fs.exists(parentCachePath) ?
fs.listStatus(parentCachePath) : new
FileStatus[0]).sorted().collect(Collectors.toList());
+ if (!fs.exists(parentCachePath)) {
+ return true;
+ }
+
+ // Get all jar cache directories
+ FileStatus[] allDirs = fs.listStatus(parentCachePath);
+
+ // Sort by normalized path name for proper chronological ordering
+ // Old format (yyyy-MM) is treated as yyyy-MM.1 for sorting
+ List<FileStatus> jarDirs = Arrays.stream(allDirs)
+ .sorted((a, b) -> {
+ String nameA =
normalizeDirectoryNameForSorting(a.getPath().getName());
+ String nameB =
normalizeDirectoryNameForSorting(b.getPath().getName());
+ return nameA.compareTo(nameB);
+ })
+ .collect(Collectors.toList());
+
+ // Delete oldest directories, keeping k latest
boolean deletesSuccessful = true;
for (int i = 0; i < jarDirs.size() - k; i++) {
- deletesSuccessful &= fs.delete(jarDirs.get(i).getPath(), true);
+ Path toDelete = jarDirs.get(i).getPath();
+ LOGGER.info("Deleting old jar cache directory: {}", toDelete);
+ deletesSuccessful &= fs.delete(toDelete, true);
}
+
return deletesSuccessful;
}
+ /**
+ * Normalizes directory names for sorting to handle migration from monthly
to semi-monthly format.
+ * Converts old monthly format (yyyy-MM) to yyyy-MM.1 for consistent
chronological sorting.
+ * This ensures old directories are cleaned up before newer semi-monthly
periods in the same month.
+ *
+ * @param dirName the directory name (e.g., "2024-09" or "2024-09.1")
+ * @return normalized name for sorting (e.g., "2024-09.1" or "2024-09.2")
+ */
+ @VisibleForTesting
+ static String normalizeDirectoryNameForSorting(String dirName) {
+ // Pattern: yyyy-MM or yyyy-MM.{1,2}
+ // If it's old format (yyyy-MM without period suffix), treat as yyyy-MM.1
+ if (dirName.matches("\\d{4}-\\d{2}$")) {
+ // Old monthly format - treat as first half of month for aggressive
cleanup
+ return dirName + ".1";
+ }
+ // Already in new format (yyyy-MM.1 or yyyy-MM.2) or unrecognized format
+ return dirName;
+ }
+
public static Set<String> getAppLibJarList(Config config) {
Set<String> libAppJars = new HashSet<>(Arrays.asList(
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java
index df37a0cf8a..a7bac0a492 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java
@@ -73,13 +73,57 @@ public class YarnHelixUtilsTest {
@Test
public void testGetJarCachePath() throws IOException {
FileSystem mockFs = Mockito.mock(FileSystem.class);
+ // 1726074000013L = Sept 11, 2024 15:40:00 GMT (day 11, first half of
month)
Config config = ConfigFactory.empty()
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
ConfigValueFactory.fromAnyRef(1726074000013L))
.withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR,
ConfigValueFactory.fromAnyRef("/tmp"))
.withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
ConfigValueFactory.fromAnyRef(true));
Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(config,
mockFs);
- Assert.assertEquals(jarCachePath, new Path("/tmp/2024-09"));
+ // Sept 11 is in first half (1-15), so expect yyyy-MM.1 format
+ Assert.assertEquals(jarCachePath, new Path("/tmp/2024-09.1"));
+ }
+
+ @Test
+ public void testGetJarCachePath_SecondHalfOfMonth() throws IOException {
+ FileSystem mockFs = Mockito.mock(FileSystem.class);
+ // 1726596000000L = Sept 17, 2024 16:00:00 GMT (day 17, second half of
month)
+ Config config = ConfigFactory.empty()
+
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
ConfigValueFactory.fromAnyRef(1726596000000L))
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR,
ConfigValueFactory.fromAnyRef("/tmp"))
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
ConfigValueFactory.fromAnyRef(true));
+ Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(config,
mockFs);
+
+ // Sept 17 is in second half (16-end), so expect yyyy-MM.2 format
+ Assert.assertEquals(jarCachePath, new Path("/tmp/2024-09.2"));
+ }
+
+ @Test
+ public void testGetJarCachePath_BoundaryDay15() throws IOException {
+ FileSystem mockFs = Mockito.mock(FileSystem.class);
+ // 1726358400000L = Sept 15, 2024 00:00:00 GMT (day 15, still first half)
+ Config config = ConfigFactory.empty()
+
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
ConfigValueFactory.fromAnyRef(1726358400000L))
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR,
ConfigValueFactory.fromAnyRef("/tmp"))
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
ConfigValueFactory.fromAnyRef(true));
+ Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(config,
mockFs);
+
+ // Sept 15 is the last day of first half
+ Assert.assertEquals(jarCachePath, new Path("/tmp/2024-09.1"));
+ }
+
+ @Test
+ public void testGetJarCachePath_BoundaryDay16() throws IOException {
+ FileSystem mockFs = Mockito.mock(FileSystem.class);
+ // 1726444800000L = Sept 16, 2024 00:00:00 GMT (day 16, second half)
+ Config config = ConfigFactory.empty()
+
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
ConfigValueFactory.fromAnyRef(1726444800000L))
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR,
ConfigValueFactory.fromAnyRef("/tmp"))
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
ConfigValueFactory.fromAnyRef(true));
+ Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(config,
mockFs);
+
+ // Sept 16 is the first day of second half
+ Assert.assertEquals(jarCachePath, new Path("/tmp/2024-09.2"));
}
@Test
@@ -91,17 +135,141 @@ public class YarnHelixUtilsTest {
.withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
ConfigValueFactory.fromAnyRef(true));
Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(config,
fs);
fs.mkdirs(jarCachePath);
- fs.mkdirs(new Path(jarCachePath.getParent(), "2024-08"));
- fs.mkdirs(new Path(jarCachePath.getParent(), "2024-07"));
- fs.mkdirs(new Path(jarCachePath.getParent(), "2024-06"));
+ fs.mkdirs(new Path(jarCachePath.getParent(), "2024-08.2"));
+ fs.mkdirs(new Path(jarCachePath.getParent(), "2024-08.1"));
+ fs.mkdirs(new Path(jarCachePath.getParent(), "2024-07.2"));
+ fs.mkdirs(new Path(jarCachePath.getParent(), "2024-07.1"));
- YarnHelixUtils.retainKLatestJarCachePaths(jarCachePath.getParent(), 2, fs);
+ // Retain 3 latest periods (current + 2 previous)
+ YarnHelixUtils.retainKLatestJarCachePaths(jarCachePath.getParent(), 3, fs);
- Assert.assertTrue(fs.exists(new Path(this.tempDir, "tmp/2024-09")));
- Assert.assertTrue(fs.exists(new Path(this.tempDir, "tmp/2024-08")));
+ // Should keep the 3 latest
+ Assert.assertTrue(fs.exists(new Path(this.tempDir, "tmp/2024-09.1")));
+ Assert.assertTrue(fs.exists(new Path(this.tempDir, "tmp/2024-08.2")));
+ Assert.assertTrue(fs.exists(new Path(this.tempDir, "tmp/2024-08.1")));
// Should be cleaned up
- Assert.assertFalse(fs.exists(new Path(this.tempDir, "tmp/2024-07")));
- Assert.assertFalse(fs.exists(new Path(this.tempDir, "tmp/2024-06")));
+ Assert.assertFalse(fs.exists(new Path(this.tempDir, "tmp/2024-07.2")));
+ Assert.assertFalse(fs.exists(new Path(this.tempDir, "tmp/2024-07.1")));
+ }
+
+ @Test
+ public void retainLatestKJarCachePaths_OnlyOldMonthlyFormat() throws
IOException {
+ // Test cleanup with only old monthly format directories (pre-migration
scenario)
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ Path testDir = new Path(this.tempDir, "tmp-old-only");
+
+ // Create only old monthly format directories
+ fs.mkdirs(new Path(testDir, "2024-09")); // Current (most recent)
+ fs.mkdirs(new Path(testDir, "2024-08"));
+ fs.mkdirs(new Path(testDir, "2024-07"));
+ fs.mkdirs(new Path(testDir, "2024-06"));
+ fs.mkdirs(new Path(testDir, "2024-05"));
+
+ // Retain 3 latest periods
+ YarnHelixUtils.retainKLatestJarCachePaths(testDir, 3, fs);
+
+ // Should keep the 3 most recent monthly directories
+ Assert.assertTrue("2024-09 should be kept", fs.exists(new Path(testDir,
"2024-09")));
+ Assert.assertTrue("2024-08 should be kept", fs.exists(new Path(testDir,
"2024-08")));
+ Assert.assertTrue("2024-07 should be kept", fs.exists(new Path(testDir,
"2024-07")));
+
+ // These should be deleted
+ Assert.assertFalse("2024-06 should be deleted", fs.exists(new
Path(testDir, "2024-06")));
+ Assert.assertFalse("2024-05 should be deleted", fs.exists(new
Path(testDir, "2024-05")));
+ }
+
+ @Test
+ public void retainLatestKJarCachePaths_MixedFormats() throws IOException {
+ // Test cleanup with both old monthly and new semi-monthly formats
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ Config config = ConfigFactory.empty()
+
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
ConfigValueFactory.fromAnyRef(1726596000000L))
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR,
ConfigValueFactory.fromAnyRef(this.tempDir + "/tmp-mixed"))
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
ConfigValueFactory.fromAnyRef(true));
+ Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(config,
fs);
+ // Current path is 2024-09.2
+ fs.mkdirs(jarCachePath);
+
+ // Create mix of old monthly and new semi-monthly formats
+ fs.mkdirs(new Path(jarCachePath.getParent(), "2024-09.1")); // New format
+ fs.mkdirs(new Path(jarCachePath.getParent(), "2024-09")); // Old format
(treated as 2024-09.1)
+ fs.mkdirs(new Path(jarCachePath.getParent(), "2024-08.2")); // New format
+ fs.mkdirs(new Path(jarCachePath.getParent(), "2024-08")); // Old format
(treated as 2024-08.1)
+ fs.mkdirs(new Path(jarCachePath.getParent(), "2024-08.1")); // New format
+ fs.mkdirs(new Path(jarCachePath.getParent(), "2024-07")); // Old format
(should be deleted)
+
+ // Retain 3 latest periods
+ YarnHelixUtils.retainKLatestJarCachePaths(jarCachePath.getParent(), 3, fs);
+
+ // Expected retention (keeping 3 latest):
+ // After normalization and sorting, the order is:
+ // 1. 2024-07 (normalized to 2024-07.1)
+ // 2. 2024-08 (normalized to 2024-08.1)
+ // 3. 2024-08.1 (normalized to 2024-08.1)
+ // 4. 2024-08.2 (normalized to 2024-08.2)
+ // 5. 2024-09 (normalized to 2024-09.1)
+ // 6. 2024-09.1 (normalized to 2024-09.1)
+ // 7. 2024-09.2 (normalized to 2024-09.2)
+ //
+ // Keeping k=3 latest means we keep entries 5, 6, 7:
+ // Keep: 2024-09 (old), 2024-09.1, 2024-09.2
+ // Delete: Everything from August and July
+
+ Assert.assertTrue("Current period 2024-09.2 should be kept", fs.exists(new
Path(this.tempDir, "tmp-mixed/2024-09.2")));
+ Assert.assertTrue("2024-09.1 should be kept", fs.exists(new
Path(this.tempDir, "tmp-mixed/2024-09.1")));
+ Assert.assertTrue("Old format 2024-09 should be kept (sorts same as
2024-09.1)", fs.exists(new Path(this.tempDir, "tmp-mixed/2024-09")));
+
+ // All August and July directories should be cleaned up
+ Assert.assertFalse("2024-08.2 should be deleted", fs.exists(new
Path(this.tempDir, "tmp-mixed/2024-08.2")));
+ Assert.assertFalse("Old format 2024-08 should be deleted", fs.exists(new
Path(this.tempDir, "tmp-mixed/2024-08")));
+ Assert.assertFalse("2024-08.1 should be deleted", fs.exists(new
Path(this.tempDir, "tmp-mixed/2024-08.1")));
+ Assert.assertFalse("Old format 2024-07 should be deleted", fs.exists(new
Path(this.tempDir, "tmp-mixed/2024-07")));
+ }
+
+ @Test
+ public void retainLatestKJarCachePaths_EarlyMigrationPhase() throws
IOException {
+ // Test early migration: mostly old monthly with just one new semi-monthly
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ Path testDir = new Path(this.tempDir, "tmp-early-migration");
+
+ // Create mostly old format with one new semi-monthly (simulating first
deployment)
+ fs.mkdirs(new Path(testDir, "2024-10.1")); // NEW: First semi-monthly
period created
+ fs.mkdirs(new Path(testDir, "2024-09")); // OLD: Previous monthly
+ fs.mkdirs(new Path(testDir, "2024-08")); // OLD
+ fs.mkdirs(new Path(testDir, "2024-07")); // OLD
+ fs.mkdirs(new Path(testDir, "2024-06")); // OLD
+
+ // Retain 3 latest periods
+ YarnHelixUtils.retainKLatestJarCachePaths(testDir, 3, fs);
+
+ // After sorting with normalization (old format treated as .1):
+ // 2024-06 -> 2024-06.1
+ // 2024-07 -> 2024-07.1
+ // 2024-08 -> 2024-08.1
+ // 2024-09 -> 2024-09.1
+ // 2024-10.1 -> 2024-10.1 (stays)
+ //
+ // Keep last 3: 2024-08, 2024-09, 2024-10.1
+
+ Assert.assertTrue("New format 2024-10.1 should be kept", fs.exists(new
Path(testDir, "2024-10.1")));
+ Assert.assertTrue("Old format 2024-09 should be kept", fs.exists(new
Path(testDir, "2024-09")));
+ Assert.assertTrue("Old format 2024-08 should be kept", fs.exists(new
Path(testDir, "2024-08")));
+
+ // These should be deleted
+ Assert.assertFalse("Old format 2024-07 should be deleted", fs.exists(new
Path(testDir, "2024-07")));
+ Assert.assertFalse("Old format 2024-06 should be deleted", fs.exists(new
Path(testDir, "2024-06")));
+ }
+
+ @Test
+ public void retainLatestKJarCachePaths_EmptyDirectory() throws IOException {
+ // Test edge case: empty parent directory
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ Path testDir = new Path(this.tempDir, "tmp-empty");
+ fs.mkdirs(testDir);
+
+ // Should not throw exception on empty directory
+ boolean result = YarnHelixUtils.retainKLatestJarCachePaths(testDir, 3, fs);
+ Assert.assertTrue("Should return true for empty directory", result);
}
@Test