This is an automated email from the ASF dual-hosted git repository.

vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ee5b3efcf [GOBBLIN-2243]Add Fallback dir for Jar caching (#4160)
4ee5b3efcf is described below

commit 4ee5b3efcfe8e2604683fa8523a0ebd250eb1471
Author: pratapaditya04 <[email protected]>
AuthorDate: Thu Dec 18 11:48:41 2025 +0530

    [GOBBLIN-2243]Add Fallback dir for Jar caching (#4160)
    
    * added fallback caching dir
    
    * addressed comments
    
    * refactor
    
    * resolved comments
    
    * refactor
---
 .../apache/gobblin/temporal/yarn/YarnService.java  |   2 +-
 .../gobblin/yarn/GobblinYarnAppLauncher.java       |   6 +-
 .../gobblin/yarn/GobblinYarnConfigurationKeys.java |   6 +
 .../apache/gobblin/yarn/JarCachePathResolver.java  | 129 ++++++++++
 .../org/apache/gobblin/yarn/YarnHelixUtils.java    |  20 +-
 .../gobblin/yarn/JarCachePathResolverTest.java     | 276 +++++++++++++++++++++
 .../apache/gobblin/yarn/YarnHelixUtilsTest.java    |  12 +-
 7 files changed, 435 insertions(+), 16 deletions(-)

diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
index 2818982bab..d28d94e9c9 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
@@ -394,7 +394,7 @@ class YarnService extends AbstractIdleService {
     Path appWorkDir = 
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, 
this.applicationName, this.applicationId);
     // Used for -SNAPSHOT versions of jars
     Path containerJarsUnsharedDir = new Path(appWorkDir, 
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
-    Path jarCacheDir = this.jarCacheEnabled ? 
YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir;
+    Path jarCacheDir = this.jarCacheEnabled ? 
YarnHelixUtils.calculatePerMonthJarCachePath(this.config, this.fs) : appWorkDir;
     Path containerJarsCachedDir = new Path(jarCacheDir, 
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
     LOGGER.info("Container cached jars root dir: " + containerJarsCachedDir);
     LOGGER.info("Container execution-private jars root dir: " + 
containerJarsUnsharedDir);
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index d2f2180fff..4311e4ad77 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -608,7 +608,7 @@ public class GobblinYarnAppLauncher {
     
amContainerLaunchContext.setCommands(Lists.newArrayList(buildApplicationMasterCommand(applicationId.toString(),
 resource.getMemory())));
 
     if (this.jarCacheEnabled) {
-      Path jarCachePath = 
YarnHelixUtils.calculatePerMonthJarCachePath(this.config);
+      Path jarCachePath = 
YarnHelixUtils.calculatePerMonthJarCachePath(this.config, this.fs);
       // Retain at least the current and last month's jars to handle 
executions running for ~30 days max
       boolean cleanedSuccessfully = 
YarnHelixUtils.retainKLatestJarCachePaths(jarCachePath.getParent(), 2, this.fs);
       if (!cleanedSuccessfully) {
@@ -675,7 +675,7 @@ public class GobblinYarnAppLauncher {
 
   private Map<String, LocalResource> addAppMasterLocalResources(ApplicationId 
applicationId) throws IOException {
     Path appWorkDir = 
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, 
this.applicationName, applicationId.toString());
-    Path jarsRootDir = this.jarCacheEnabled ? 
YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir;
+    Path jarsRootDir = this.jarCacheEnabled ? 
YarnHelixUtils.calculatePerMonthJarCachePath(this.config, this.fs) : appWorkDir;
 
     Path appMasterWorkDir = new Path(appWorkDir, 
GobblinYarnConfigurationKeys.APP_MASTER_WORK_DIR_NAME);
     Path appMasterJarsCacheDir = new Path(jarsRootDir, 
GobblinYarnConfigurationKeys.APP_MASTER_WORK_DIR_NAME);
@@ -730,7 +730,7 @@ public class GobblinYarnAppLauncher {
   private void addContainerLocalResources(ApplicationId applicationId) throws 
IOException {
     Path appWorkDir = 
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, 
this.applicationName,
         applicationId.toString());
-    Path jarsRootDir = this.jarCacheEnabled ? 
YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir;
+    Path jarsRootDir = this.jarCacheEnabled ? 
YarnHelixUtils.calculatePerMonthJarCachePath(this.config, this.fs) : appWorkDir;
     Path containerWorkDir = new Path(appWorkDir, 
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
     Path containerJarsRootDir = new Path(jarsRootDir, 
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
     LOGGER.info("Configured Container work directory to: {}", 
containerWorkDir);
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
index 10bc9f9709..00af259a22 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
@@ -55,6 +55,12 @@ public class GobblinYarnConfigurationKeys {
 
   public static final String JAR_CACHE_DIR = GOBBLIN_YARN_PREFIX + 
"jar.cache.dir";
 
+  public static final String JAR_CACHE_ROOT_DIR = GOBBLIN_YARN_PREFIX + 
"jar.cache.root.dir";
+
+  public static final String FALLBACK_JAR_CACHE_ROOT_DIR = GOBBLIN_YARN_PREFIX 
+ "jar.cache.fallback.root.dir";
+
+  public static final String JAR_CACHE_SUFFIX = GOBBLIN_YARN_PREFIX + 
"jar.cache.suffix";
+
   public static final String YARN_APPLICATION_LIB_JAR_LIST = 
GOBBLIN_YARN_PREFIX + "lib.jar.list";
 
   /**
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/JarCachePathResolver.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/JarCachePathResolver.java
new file mode 100644
index 0000000000..302791ab9e
--- /dev/null
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/JarCachePathResolver.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Utility class for resolving the jar cache directory path by validating 
filesystem on path existence and applying fallback logic.
+ *
+ * <p>Resolution logic:</p>
+ * <ol>
+ *   <li>If JAR_CACHE_DIR is explicitly configured, uses it as-is (for 
backward compatibility)</li>
+ *   <li>Otherwise, validates JAR_CACHE_ROOT_DIR exists on filesystem</li>
+ *   <li>If not found, tries FALLBACK_JAR_CACHE_ROOT_DIR</li>
+ *   <li>Combines validated root with JAR_CACHE_SUFFIX (or default suffix) to 
form final path</li>
+ *   <li>If no valid root found, throws IOException</li>
+ * </ol>
+ */
+public class JarCachePathResolver {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(JarCachePathResolver.class);
+  // Note: Trailing slash will be normalized away by Hadoop Path
+  private static final String DEFAULT_JAR_CACHE_SUFFIX = 
".gobblinCache/gobblin-temporal";
+
+  // Private constructor to prevent instantiation
+  private JarCachePathResolver() {
+  }
+
+  /**
+   * Resolves the jar cache directory path, applying validation and fallback 
logic.
+   *
+   * @param config the configuration
+   * @param fs the filesystem to use for validation
+   * @return the resolved jar cache directory path
+   * @throws IOException if filesystem operations fail or no valid root 
directory is found
+   */
+  public static Path resolveJarCachePath(Config config, FileSystem fs) throws 
IOException {
+    // If JAR_CACHE_DIR is explicitly set, use it as-is
+    if (config.hasPath(GobblinYarnConfigurationKeys.JAR_CACHE_DIR)) {
+      String explicitCacheDir = 
config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR);
+      LOGGER.info("Using explicitly configured JAR_CACHE_DIR: {}", 
explicitCacheDir);
+      return new Path(explicitCacheDir);
+    }
+
+    // Get suffix from config, or use default if not configured or empty
+    String suffix = ConfigUtils.getString(config, 
GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, "");
+    if (suffix == null || suffix.trim().isEmpty()) {
+      LOGGER.info("JAR_CACHE_SUFFIX not configured or empty, using default: 
{}", DEFAULT_JAR_CACHE_SUFFIX);
+      suffix = DEFAULT_JAR_CACHE_SUFFIX;
+    }
+
+    // Try primary root directory
+    if (config.hasPath(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR)) {
+      String rootDir = 
config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR);
+      Path resolvedPath = validateAndComputePath(fs, rootDir, suffix, 
GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR);
+      if (resolvedPath != null) {
+        return resolvedPath;
+      }
+    }
+
+    // Try fallback root directory
+    if 
(config.hasPath(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR)) {
+      String fallbackRootDir = 
config.getString(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR);
+      Path resolvedPath = validateAndComputePath(fs, fallbackRootDir, suffix, 
GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR);
+      if (resolvedPath != null) {
+        return resolvedPath;
+      }
+    }
+
+    // No valid root directory found - fail
+    throw new IOException("No valid jar cache root directory found. Please 
configure "
+        + GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR + " or "
+        + GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR
+        + " with a valid directory path, or explicitly set "
+        + GobblinYarnConfigurationKeys.JAR_CACHE_DIR);
+  }
+
+  /**
+   * Validates if the root directory exists and computes the full path with 
suffix.
+   *
+   * @param fs the filesystem to check
+   * @param rootDir the root directory to validate
+   * @param suffix the suffix to append
+   * @param configName the config name for logging
+   * @return the computed path if valid, null otherwise
+   * @throws IOException if filesystem operations fail
+   */
+  @VisibleForTesting
+  static Path validateAndComputePath(FileSystem fs, String rootDir, String 
suffix, String configName) throws IOException {
+    Path rootPath = new Path(rootDir);
+    if (fs.exists(rootPath)) {
+      // Strip leading '/' from suffix to ensure proper concatenation
+      // Otherwise, Hadoop Path treats it as absolute path and ignores the 
parent
+      String normalizedSuffix = suffix.startsWith("/") ? suffix.substring(1) : 
suffix;
+      Path fullPath = new Path(rootPath, normalizedSuffix);
+      LOGGER.info("{} exists: {}, resolved JAR_CACHE_DIR to: {}", configName, 
rootDir, fullPath);
+      return fullPath;
+    }
+    LOGGER.warn("Configured {} does not exist: {}", configName, rootDir);
+    return null;
+  }
+
+}
+
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
index d38ebe52ee..1a4113dbee 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
@@ -209,15 +209,19 @@ public class YarnHelixUtils {
   /**
    * Calculate the path of a jar cache on HDFS, which is retained on a monthly 
basis.
    * Should be used in conjunction with {@link 
#retainKLatestJarCachePaths(Path, int, FileSystem)}. to clean up the cache on a 
periodic basis
-   * @param config
-   * @return
-   * @throws IOException
+   * @param config the configuration
+   * @param fs the filesystem to use for validation
+   * @return the monthly jar cache path
+   * @throws IOException if filesystem operations fail
    */
-  public static Path calculatePerMonthJarCachePath(Config config) throws 
IOException {
-    Path jarsCacheDirMonthly = new 
Path(config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR));
-    String monthSuffix = new 
SimpleDateFormat("yyyy-MM").format(config.getLong(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY));
-    return new Path(jarsCacheDirMonthly, monthSuffix);
-
+  public static Path calculatePerMonthJarCachePath(Config config, FileSystem 
fs) throws IOException {
+    // Use JarCachePathResolver to resolve the base jar cache directory
+    Path baseCacheDir = JarCachePathResolver.resolveJarCachePath(config, fs);
+    
+    // Append monthly suffix
+    String monthSuffix = new SimpleDateFormat("yyyy-MM").format(
+        
config.getLong(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY));
+    return new Path(baseCacheDir, monthSuffix);
   }
 
   /**
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/JarCachePathResolverTest.java
 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/JarCachePathResolverTest.java
new file mode 100644
index 0000000000..466543be1c
--- /dev/null
+++ 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/JarCachePathResolverTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+
+/**
+ * Tests for {@link JarCachePathResolver}.
+ */
+public class JarCachePathResolverTest {
+
+
+  @Test
+  public void testResolveJarCachePath_ExplicitJarCacheDir() throws IOException 
{
+    FileSystem mockFs = Mockito.mock(FileSystem.class);
+    String explicitCacheDir = "/explicit/cache/dir";
+    
+    Config config = ConfigFactory.empty()
+        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, 
ConfigValueFactory.fromAnyRef(true))
+        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, 
ConfigValueFactory.fromAnyRef(explicitCacheDir))
+        
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
 
+            ConfigValueFactory.fromAnyRef(System.currentTimeMillis()));
+    
+    Path result = JarCachePathResolver.resolveJarCachePath(config, mockFs);
+    
+    // Should use explicitly configured JAR_CACHE_DIR
+    Assert.assertEquals(result.toString(), explicitCacheDir);
+    // Verify no filesystem checks were made (explicit config is used as-is)
+    Mockito.verify(mockFs, Mockito.never()).exists(Mockito.any(Path.class));
+  }
+
+  @Test
+  public void testResolveJarCachePath_RootDirExists() throws IOException {
+    FileSystem mockFs = Mockito.mock(FileSystem.class);
+    String rootDir = "/user/testuser";
+    String suffix = ".gobblinCache/gobblin-temporal/myproject";
+    String expectedFullPath = 
"/user/testuser/.gobblinCache/gobblin-temporal/myproject";
+    
+    // Mock: Root directory exists
+    Mockito.when(mockFs.exists(Mockito.any(Path.class))).thenAnswer(new 
Answer<Boolean>() {
+      @Override
+      public Boolean answer(InvocationOnMock invocation) {
+        Path path = invocation.getArgument(0);
+        return path.toString().equals(rootDir);
+      }
+    });
+    
+    Config config = ConfigFactory.empty()
+        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR, 
ConfigValueFactory.fromAnyRef(rootDir))
+        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, 
ConfigValueFactory.fromAnyRef(suffix))
+        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, 
ConfigValueFactory.fromAnyRef(true))
+        
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
 
+            ConfigValueFactory.fromAnyRef(System.currentTimeMillis()));
+    
+    Path result = JarCachePathResolver.resolveJarCachePath(config, mockFs);
+    
+    // Should resolve to root + suffix
+    Assert.assertEquals(result.toString(), expectedFullPath);
+  }
+
+  @Test
+  public void testResolveJarCachePath_RootDirNotExistsFallbackRootExists() 
throws IOException {
+    FileSystem mockFs = Mockito.mock(FileSystem.class);
+    String rootDir = "/user/baduser";
+    String fallbackRootDir = "/user/gooduser";
+    String suffix = ".gobblinCache/gobblin-temporal/myproject";
+    String expectedFullPath = 
"/user/gooduser/.gobblinCache/gobblin-temporal/myproject";
+    
+    // Mock: Root dir doesn't exist, but fallback root exists
+    Mockito.when(mockFs.exists(Mockito.any(Path.class))).thenAnswer(new 
Answer<Boolean>() {
+      @Override
+      public Boolean answer(InvocationOnMock invocation) {
+        Path path = invocation.getArgument(0);
+        // Only fallback root directory exists
+        return path.toString().equals(fallbackRootDir);
+      }
+    });
+    
+    Config config = ConfigFactory.empty()
+        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR, 
ConfigValueFactory.fromAnyRef(rootDir))
+        .withValue(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR, 
ConfigValueFactory.fromAnyRef(fallbackRootDir))
+        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, 
ConfigValueFactory.fromAnyRef(suffix))
+        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, 
ConfigValueFactory.fromAnyRef(true))
+        
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
 
+            ConfigValueFactory.fromAnyRef(System.currentTimeMillis()));
+    
+    Path result = JarCachePathResolver.resolveJarCachePath(config, mockFs);
+    
+    // Should resolve to fallback root + suffix
+    Assert.assertEquals(result.toString(), expectedFullPath);
+  }
+
+  @Test(expectedExceptions = IOException.class, 
+        expectedExceptionsMessageRegExp = ".*No valid jar cache root directory 
found.*")
+  public void testResolveJarCachePath_NeitherRootDirExists() throws 
IOException {
+    FileSystem mockFs = Mockito.mock(FileSystem.class);
+    String rootDir = "/user/baduser1";
+    String fallbackRootDir = "/user/baduser2";
+    String suffix = ".gobblinCache/gobblin-temporal/myproject";
+    
+    // Mock: Neither root directory exists
+    Mockito.when(mockFs.exists(Mockito.any(Path.class))).thenReturn(false);
+    
+    Config config = ConfigFactory.empty()
+        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR, 
ConfigValueFactory.fromAnyRef(rootDir))
+        .withValue(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR, 
ConfigValueFactory.fromAnyRef(fallbackRootDir))
+        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, 
ConfigValueFactory.fromAnyRef(suffix))
+        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, 
ConfigValueFactory.fromAnyRef(true))
+        
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
 
+            ConfigValueFactory.fromAnyRef(System.currentTimeMillis()));
+    
+    // Should throw IOException when no valid root directory found
+    JarCachePathResolver.resolveJarCachePath(config, mockFs);
+  }
+
+  @Test
+  public void testResolveJarCachePath_OnlyFallbackRootConfigured() throws 
IOException {
+    FileSystem mockFs = Mockito.mock(FileSystem.class);
+    String fallbackRootDir = "/user/testuser";
+    String suffix = ".gobblinCache/gobblin-temporal/myproject";
+    String expectedFullPath = 
"/user/testuser/.gobblinCache/gobblin-temporal/myproject";
+    
+    // Mock: Fallback root directory exists
+    Mockito.when(mockFs.exists(Mockito.any(Path.class))).thenAnswer(new 
Answer<Boolean>() {
+      @Override
+      public Boolean answer(InvocationOnMock invocation) {
+        Path path = invocation.getArgument(0);
+        return path.toString().equals(fallbackRootDir);
+      }
+    });
+    
+    Config config = ConfigFactory.empty()
+        .withValue(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR, 
ConfigValueFactory.fromAnyRef(fallbackRootDir))
+        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, 
ConfigValueFactory.fromAnyRef(suffix))
+        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, 
ConfigValueFactory.fromAnyRef(true))
+        
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
 
+            ConfigValueFactory.fromAnyRef(System.currentTimeMillis()));
+    
+    Path result = JarCachePathResolver.resolveJarCachePath(config, mockFs);
+    
+    // Should resolve to fallback root + suffix
+    Assert.assertEquals(result.toString(), expectedFullPath);
+  }
+
+  @Test(expectedExceptions = IOException.class,
+        expectedExceptionsMessageRegExp = ".*No valid jar cache root directory 
found.*")
+  public void testResolveJarCachePath_NoRootDirsConfigured() throws 
IOException {
+    FileSystem mockFs = Mockito.mock(FileSystem.class);
+    
+    Config config = ConfigFactory.empty()
+        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, 
ConfigValueFactory.fromAnyRef(true))
+        
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
 
+            ConfigValueFactory.fromAnyRef(System.currentTimeMillis()));
+    
+    // Should throw IOException when no root directories are configured
+    JarCachePathResolver.resolveJarCachePath(config, mockFs);
+  }
+
+  @Test
+  public void testResolveJarCachePath_DefaultSuffix() throws IOException {
+    FileSystem mockFs = Mockito.mock(FileSystem.class);
+    String rootDir = "/user/testuser";
+    // Note: Hadoop Path normalizes and removes trailing slashes
+    String expectedFullPath = "/user/testuser/.gobblinCache/gobblin-temporal";
+    
+    // Mock: Root directory exists
+    Mockito.when(mockFs.exists(Mockito.any(Path.class))).thenAnswer(new 
Answer<Boolean>() {
+      @Override
+      public Boolean answer(InvocationOnMock invocation) {
+        Path path = invocation.getArgument(0);
+        return path.toString().equals(rootDir);
+      }
+    });
+    
+    // Config without JAR_CACHE_SUFFIX - should use default
+    Config config = ConfigFactory.empty()
+        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR, 
ConfigValueFactory.fromAnyRef(rootDir))
+        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, 
ConfigValueFactory.fromAnyRef(true))
+        
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
 
+            ConfigValueFactory.fromAnyRef(System.currentTimeMillis()));
+    
+    Path result = JarCachePathResolver.resolveJarCachePath(config, mockFs);
+    
+    // Should use default suffix
+    Assert.assertEquals(result.toString(), expectedFullPath);
+  }
+
+  @Test
+  public void testResolveJarCachePath_EmptySuffixUsesDefault() throws 
IOException {
+    FileSystem mockFs = Mockito.mock(FileSystem.class);
+    String rootDir = "/user/testuser";
+    // Note: Hadoop Path normalizes and removes trailing slashes
+    String expectedFullPath = "/user/testuser/.gobblinCache/gobblin-temporal";
+    
+    // Mock: Root directory exists
+    Mockito.when(mockFs.exists(Mockito.any(Path.class))).thenAnswer(new 
Answer<Boolean>() {
+      @Override
+      public Boolean answer(InvocationOnMock invocation) {
+        Path path = invocation.getArgument(0);
+        return path.toString().equals(rootDir);
+      }
+    });
+    
+    // Config with empty JAR_CACHE_SUFFIX - should use default
+    Config config = ConfigFactory.empty()
+        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR, 
ConfigValueFactory.fromAnyRef(rootDir))
+        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, 
ConfigValueFactory.fromAnyRef(""))
+        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, 
ConfigValueFactory.fromAnyRef(true))
+        
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
 
+            ConfigValueFactory.fromAnyRef(System.currentTimeMillis()));
+    
+    Path result = JarCachePathResolver.resolveJarCachePath(config, mockFs);
+    
+    // Should use default suffix when configured suffix is empty
+    Assert.assertEquals(result.toString(), expectedFullPath);
+  }
+
+  @Test
+  public void testResolveJarCachePath_SuffixNormalization() throws IOException 
{
+    FileSystem mockFs = Mockito.mock(FileSystem.class);
+    String rootDir = "/user/testuser";
+    String suffixWithLeadingSlash = 
"/.gobblinCache/gobblin-temporal/myproject";
+    String expectedFullPath = 
"/user/testuser/.gobblinCache/gobblin-temporal/myproject";
+    
+    // Mock: Root directory exists
+    Mockito.when(mockFs.exists(Mockito.any(Path.class))).thenAnswer(new 
Answer<Boolean>() {
+      @Override
+      public Boolean answer(InvocationOnMock invocation) {
+        Path path = invocation.getArgument(0);
+        return path.toString().equals(rootDir);
+      }
+    });
+    
+    Config config = ConfigFactory.empty()
+        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR, 
ConfigValueFactory.fromAnyRef(rootDir))
+        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, 
ConfigValueFactory.fromAnyRef(suffixWithLeadingSlash))
+        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, 
ConfigValueFactory.fromAnyRef(true))
+        
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
 
+            ConfigValueFactory.fromAnyRef(System.currentTimeMillis()));
+    
+    Path result = JarCachePathResolver.resolveJarCachePath(config, mockFs);
+    
+    // Should normalize suffix by stripping leading '/' to avoid absolute path 
issue
+    Assert.assertEquals(result.toString(), expectedFullPath);
+  }
+
+}
+
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java
index 1030e7b024..df37a0cf8a 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.junit.Assert;
+import org.mockito.Mockito;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.Test;
 
@@ -71,10 +72,12 @@ public class YarnHelixUtilsTest {
 
   @Test
   public void testGetJarCachePath() throws IOException {
+    FileSystem mockFs = Mockito.mock(FileSystem.class);
     Config config = ConfigFactory.empty()
         
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
 ConfigValueFactory.fromAnyRef(1726074000013L))
-        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, 
ConfigValueFactory.fromAnyRef("/tmp"));
-    Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(config);
+        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, 
ConfigValueFactory.fromAnyRef("/tmp"))
+        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, 
ConfigValueFactory.fromAnyRef(true));
+    Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(config, 
mockFs);
 
     Assert.assertEquals(jarCachePath, new Path("/tmp/2024-09"));
   }
@@ -84,8 +87,9 @@ public class YarnHelixUtilsTest {
     FileSystem fs = FileSystem.getLocal(new Configuration());
     Config config = ConfigFactory.empty()
         
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
 ConfigValueFactory.fromAnyRef(1726074000013L))
-        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, 
ConfigValueFactory.fromAnyRef(this.tempDir + "/tmp"));
-    Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(config);
+        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, 
ConfigValueFactory.fromAnyRef(this.tempDir + "/tmp"))
+        .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, 
ConfigValueFactory.fromAnyRef(true));
+    Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(config, 
fs);
     fs.mkdirs(jarCachePath);
     fs.mkdirs(new Path(jarCachePath.getParent(), "2024-08"));
     fs.mkdirs(new Path(jarCachePath.getParent(), "2024-07"));

Reply via email to