This is an automated email from the ASF dual-hosted git repository.
vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4ee5b3efcf [GOBBLIN-2243]Add Fallback dir for Jar caching (#4160)
4ee5b3efcf is described below
commit 4ee5b3efcfe8e2604683fa8523a0ebd250eb1471
Author: pratapaditya04 <[email protected]>
AuthorDate: Thu Dec 18 11:48:41 2025 +0530
[GOBBLIN-2243]Add Fallback dir for Jar caching (#4160)
* added fallback caching dir
* addressed comments
* refactor
* resolved comments
* refactor
---
.../apache/gobblin/temporal/yarn/YarnService.java | 2 +-
.../gobblin/yarn/GobblinYarnAppLauncher.java | 6 +-
.../gobblin/yarn/GobblinYarnConfigurationKeys.java | 6 +
.../apache/gobblin/yarn/JarCachePathResolver.java | 129 ++++++++++
.../org/apache/gobblin/yarn/YarnHelixUtils.java | 20 +-
.../gobblin/yarn/JarCachePathResolverTest.java | 276 +++++++++++++++++++++
.../apache/gobblin/yarn/YarnHelixUtilsTest.java | 12 +-
7 files changed, 435 insertions(+), 16 deletions(-)
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
index 2818982bab..d28d94e9c9 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
@@ -394,7 +394,7 @@ class YarnService extends AbstractIdleService {
Path appWorkDir =
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs,
this.applicationName, this.applicationId);
// Used for -SNAPSHOT versions of jars
Path containerJarsUnsharedDir = new Path(appWorkDir,
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
- Path jarCacheDir = this.jarCacheEnabled ?
YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir;
+ Path jarCacheDir = this.jarCacheEnabled ?
YarnHelixUtils.calculatePerMonthJarCachePath(this.config, this.fs) : appWorkDir;
Path containerJarsCachedDir = new Path(jarCacheDir,
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
LOGGER.info("Container cached jars root dir: " + containerJarsCachedDir);
LOGGER.info("Container execution-private jars root dir: " +
containerJarsUnsharedDir);
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index d2f2180fff..4311e4ad77 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -608,7 +608,7 @@ public class GobblinYarnAppLauncher {
amContainerLaunchContext.setCommands(Lists.newArrayList(buildApplicationMasterCommand(applicationId.toString(),
resource.getMemory())));
if (this.jarCacheEnabled) {
- Path jarCachePath =
YarnHelixUtils.calculatePerMonthJarCachePath(this.config);
+ Path jarCachePath =
YarnHelixUtils.calculatePerMonthJarCachePath(this.config, this.fs);
// Retain at least the current and last month's jars to handle
executions running for ~30 days max
boolean cleanedSuccessfully =
YarnHelixUtils.retainKLatestJarCachePaths(jarCachePath.getParent(), 2, this.fs);
if (!cleanedSuccessfully) {
@@ -675,7 +675,7 @@ public class GobblinYarnAppLauncher {
private Map<String, LocalResource> addAppMasterLocalResources(ApplicationId
applicationId) throws IOException {
Path appWorkDir =
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs,
this.applicationName, applicationId.toString());
- Path jarsRootDir = this.jarCacheEnabled ?
YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir;
+ Path jarsRootDir = this.jarCacheEnabled ?
YarnHelixUtils.calculatePerMonthJarCachePath(this.config, this.fs) : appWorkDir;
Path appMasterWorkDir = new Path(appWorkDir,
GobblinYarnConfigurationKeys.APP_MASTER_WORK_DIR_NAME);
Path appMasterJarsCacheDir = new Path(jarsRootDir,
GobblinYarnConfigurationKeys.APP_MASTER_WORK_DIR_NAME);
@@ -730,7 +730,7 @@ public class GobblinYarnAppLauncher {
private void addContainerLocalResources(ApplicationId applicationId) throws
IOException {
Path appWorkDir =
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs,
this.applicationName,
applicationId.toString());
- Path jarsRootDir = this.jarCacheEnabled ?
YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir;
+ Path jarsRootDir = this.jarCacheEnabled ?
YarnHelixUtils.calculatePerMonthJarCachePath(this.config, this.fs) : appWorkDir;
Path containerWorkDir = new Path(appWorkDir,
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
Path containerJarsRootDir = new Path(jarsRootDir,
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
LOGGER.info("Configured Container work directory to: {}",
containerWorkDir);
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
index 10bc9f9709..00af259a22 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
@@ -55,6 +55,12 @@ public class GobblinYarnConfigurationKeys {
public static final String JAR_CACHE_DIR = GOBBLIN_YARN_PREFIX +
"jar.cache.dir";
+ public static final String JAR_CACHE_ROOT_DIR = GOBBLIN_YARN_PREFIX +
"jar.cache.root.dir";
+
+ public static final String FALLBACK_JAR_CACHE_ROOT_DIR = GOBBLIN_YARN_PREFIX
+ "jar.cache.fallback.root.dir";
+
+ public static final String JAR_CACHE_SUFFIX = GOBBLIN_YARN_PREFIX +
"jar.cache.suffix";
+
public static final String YARN_APPLICATION_LIB_JAR_LIST =
GOBBLIN_YARN_PREFIX + "lib.jar.list";
/**
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/JarCachePathResolver.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/JarCachePathResolver.java
new file mode 100644
index 0000000000..302791ab9e
--- /dev/null
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/JarCachePathResolver.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Utility class for resolving the jar cache directory path by validating
filesystem on path existence and applying fallback logic.
+ *
+ * <p>Resolution logic:</p>
+ * <ol>
+ * <li>If JAR_CACHE_DIR is explicitly configured, uses it as-is (for
backward compatibility)</li>
+ * <li>Otherwise, validates JAR_CACHE_ROOT_DIR exists on filesystem</li>
+ * <li>If not found, tries FALLBACK_JAR_CACHE_ROOT_DIR</li>
+ * <li>Combines validated root with JAR_CACHE_SUFFIX (or default suffix) to
form final path</li>
+ * <li>If no valid root found, throws IOException</li>
+ * </ol>
+ */
+public class JarCachePathResolver {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(JarCachePathResolver.class);
+ // Note: Trailing slash will be normalized away by Hadoop Path
+ private static final String DEFAULT_JAR_CACHE_SUFFIX =
".gobblinCache/gobblin-temporal";
+
+ // Private constructor to prevent instantiation
+ private JarCachePathResolver() {
+ }
+
+ /**
+ * Resolves the jar cache directory path, applying validation and fallback
logic.
+ *
+ * @param config the configuration
+ * @param fs the filesystem to use for validation
+ * @return the resolved jar cache directory path
+ * @throws IOException if filesystem operations fail or no valid root
directory is found
+ */
+ public static Path resolveJarCachePath(Config config, FileSystem fs) throws
IOException {
+ // If JAR_CACHE_DIR is explicitly set, use it as-is
+ if (config.hasPath(GobblinYarnConfigurationKeys.JAR_CACHE_DIR)) {
+ String explicitCacheDir =
config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR);
+ LOGGER.info("Using explicitly configured JAR_CACHE_DIR: {}",
explicitCacheDir);
+ return new Path(explicitCacheDir);
+ }
+
+ // Get suffix from config, or use default if not configured or empty
+ String suffix = ConfigUtils.getString(config,
GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, "");
+ if (suffix == null || suffix.trim().isEmpty()) {
+ LOGGER.info("JAR_CACHE_SUFFIX not configured or empty, using default:
{}", DEFAULT_JAR_CACHE_SUFFIX);
+ suffix = DEFAULT_JAR_CACHE_SUFFIX;
+ }
+
+ // Try primary root directory
+ if (config.hasPath(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR)) {
+ String rootDir =
config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR);
+ Path resolvedPath = validateAndComputePath(fs, rootDir, suffix,
GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR);
+ if (resolvedPath != null) {
+ return resolvedPath;
+ }
+ }
+
+ // Try fallback root directory
+ if
(config.hasPath(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR)) {
+ String fallbackRootDir =
config.getString(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR);
+ Path resolvedPath = validateAndComputePath(fs, fallbackRootDir, suffix,
GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR);
+ if (resolvedPath != null) {
+ return resolvedPath;
+ }
+ }
+
+ // No valid root directory found - fail
+ throw new IOException("No valid jar cache root directory found. Please
configure "
+ + GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR + " or "
+ + GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR
+ + " with a valid directory path, or explicitly set "
+ + GobblinYarnConfigurationKeys.JAR_CACHE_DIR);
+ }
+
+ /**
+ * Validates if the root directory exists and computes the full path with
suffix.
+ *
+ * @param fs the filesystem to check
+ * @param rootDir the root directory to validate
+ * @param suffix the suffix to append
+ * @param configName the config name for logging
+ * @return the computed path if valid, null otherwise
+ * @throws IOException if filesystem operations fail
+ */
+ @VisibleForTesting
+ static Path validateAndComputePath(FileSystem fs, String rootDir, String
suffix, String configName) throws IOException {
+ Path rootPath = new Path(rootDir);
+ if (fs.exists(rootPath)) {
+ // Strip leading '/' from suffix to ensure proper concatenation
+ // Otherwise, Hadoop Path treats it as absolute path and ignores the
parent
+ String normalizedSuffix = suffix.startsWith("/") ? suffix.substring(1) :
suffix;
+ Path fullPath = new Path(rootPath, normalizedSuffix);
+ LOGGER.info("{} exists: {}, resolved JAR_CACHE_DIR to: {}", configName,
rootDir, fullPath);
+ return fullPath;
+ }
+ LOGGER.warn("Configured {} does not exist: {}", configName, rootDir);
+ return null;
+ }
+
+}
+
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
index d38ebe52ee..1a4113dbee 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
@@ -209,15 +209,19 @@ public class YarnHelixUtils {
/**
* Calculate the path of a jar cache on HDFS, which is retained on a monthly
basis.
* Should be used in conjunction with {@link
#retainKLatestJarCachePaths(Path, int, FileSystem)}. to clean up the cache on a
periodic basis
- * @param config
- * @return
- * @throws IOException
+ * @param config the configuration
+ * @param fs the filesystem to use for validation
+ * @return the monthly jar cache path
+ * @throws IOException if filesystem operations fail
*/
- public static Path calculatePerMonthJarCachePath(Config config) throws
IOException {
- Path jarsCacheDirMonthly = new
Path(config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR));
- String monthSuffix = new
SimpleDateFormat("yyyy-MM").format(config.getLong(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY));
- return new Path(jarsCacheDirMonthly, monthSuffix);
-
+ public static Path calculatePerMonthJarCachePath(Config config, FileSystem
fs) throws IOException {
+ // Use JarCachePathResolver to resolve the base jar cache directory
+ Path baseCacheDir = JarCachePathResolver.resolveJarCachePath(config, fs);
+
+ // Append monthly suffix
+ String monthSuffix = new SimpleDateFormat("yyyy-MM").format(
+
config.getLong(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY));
+ return new Path(baseCacheDir, monthSuffix);
}
/**
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/JarCachePathResolverTest.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/JarCachePathResolverTest.java
new file mode 100644
index 0000000000..466543be1c
--- /dev/null
+++
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/JarCachePathResolverTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+
+/**
+ * Tests for {@link JarCachePathResolver}.
+ */
+public class JarCachePathResolverTest {
+
+
+ @Test
+ public void testResolveJarCachePath_ExplicitJarCacheDir() throws IOException
{
+ FileSystem mockFs = Mockito.mock(FileSystem.class);
+ String explicitCacheDir = "/explicit/cache/dir";
+
+ Config config = ConfigFactory.empty()
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
ConfigValueFactory.fromAnyRef(true))
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR,
ConfigValueFactory.fromAnyRef(explicitCacheDir))
+
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
+ ConfigValueFactory.fromAnyRef(System.currentTimeMillis()));
+
+ Path result = JarCachePathResolver.resolveJarCachePath(config, mockFs);
+
+ // Should use explicitly configured JAR_CACHE_DIR
+ Assert.assertEquals(result.toString(), explicitCacheDir);
+ // Verify no filesystem checks were made (explicit config is used as-is)
+ Mockito.verify(mockFs, Mockito.never()).exists(Mockito.any(Path.class));
+ }
+
+ @Test
+ public void testResolveJarCachePath_RootDirExists() throws IOException {
+ FileSystem mockFs = Mockito.mock(FileSystem.class);
+ String rootDir = "/user/testuser";
+ String suffix = ".gobblinCache/gobblin-temporal/myproject";
+ String expectedFullPath =
"/user/testuser/.gobblinCache/gobblin-temporal/myproject";
+
+ // Mock: Root directory exists
+ Mockito.when(mockFs.exists(Mockito.any(Path.class))).thenAnswer(new
Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock invocation) {
+ Path path = invocation.getArgument(0);
+ return path.toString().equals(rootDir);
+ }
+ });
+
+ Config config = ConfigFactory.empty()
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR,
ConfigValueFactory.fromAnyRef(rootDir))
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX,
ConfigValueFactory.fromAnyRef(suffix))
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
ConfigValueFactory.fromAnyRef(true))
+
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
+ ConfigValueFactory.fromAnyRef(System.currentTimeMillis()));
+
+ Path result = JarCachePathResolver.resolveJarCachePath(config, mockFs);
+
+ // Should resolve to root + suffix
+ Assert.assertEquals(result.toString(), expectedFullPath);
+ }
+
+ @Test
+ public void testResolveJarCachePath_RootDirNotExistsFallbackRootExists()
throws IOException {
+ FileSystem mockFs = Mockito.mock(FileSystem.class);
+ String rootDir = "/user/baduser";
+ String fallbackRootDir = "/user/gooduser";
+ String suffix = ".gobblinCache/gobblin-temporal/myproject";
+ String expectedFullPath =
"/user/gooduser/.gobblinCache/gobblin-temporal/myproject";
+
+ // Mock: Root dir doesn't exist, but fallback root exists
+ Mockito.when(mockFs.exists(Mockito.any(Path.class))).thenAnswer(new
Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock invocation) {
+ Path path = invocation.getArgument(0);
+ // Only fallback root directory exists
+ return path.toString().equals(fallbackRootDir);
+ }
+ });
+
+ Config config = ConfigFactory.empty()
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR,
ConfigValueFactory.fromAnyRef(rootDir))
+ .withValue(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR,
ConfigValueFactory.fromAnyRef(fallbackRootDir))
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX,
ConfigValueFactory.fromAnyRef(suffix))
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
ConfigValueFactory.fromAnyRef(true))
+
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
+ ConfigValueFactory.fromAnyRef(System.currentTimeMillis()));
+
+ Path result = JarCachePathResolver.resolveJarCachePath(config, mockFs);
+
+ // Should resolve to fallback root + suffix
+ Assert.assertEquals(result.toString(), expectedFullPath);
+ }
+
+ @Test(expectedExceptions = IOException.class,
+ expectedExceptionsMessageRegExp = ".*No valid jar cache root directory
found.*")
+ public void testResolveJarCachePath_NeitherRootDirExists() throws
IOException {
+ FileSystem mockFs = Mockito.mock(FileSystem.class);
+ String rootDir = "/user/baduser1";
+ String fallbackRootDir = "/user/baduser2";
+ String suffix = ".gobblinCache/gobblin-temporal/myproject";
+
+ // Mock: Neither root directory exists
+ Mockito.when(mockFs.exists(Mockito.any(Path.class))).thenReturn(false);
+
+ Config config = ConfigFactory.empty()
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR,
ConfigValueFactory.fromAnyRef(rootDir))
+ .withValue(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR,
ConfigValueFactory.fromAnyRef(fallbackRootDir))
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX,
ConfigValueFactory.fromAnyRef(suffix))
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
ConfigValueFactory.fromAnyRef(true))
+
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
+ ConfigValueFactory.fromAnyRef(System.currentTimeMillis()));
+
+ // Should throw IOException when no valid root directory found
+ JarCachePathResolver.resolveJarCachePath(config, mockFs);
+ }
+
+ @Test
+ public void testResolveJarCachePath_OnlyFallbackRootConfigured() throws
IOException {
+ FileSystem mockFs = Mockito.mock(FileSystem.class);
+ String fallbackRootDir = "/user/testuser";
+ String suffix = ".gobblinCache/gobblin-temporal/myproject";
+ String expectedFullPath =
"/user/testuser/.gobblinCache/gobblin-temporal/myproject";
+
+ // Mock: Fallback root directory exists
+ Mockito.when(mockFs.exists(Mockito.any(Path.class))).thenAnswer(new
Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock invocation) {
+ Path path = invocation.getArgument(0);
+ return path.toString().equals(fallbackRootDir);
+ }
+ });
+
+ Config config = ConfigFactory.empty()
+ .withValue(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR,
ConfigValueFactory.fromAnyRef(fallbackRootDir))
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX,
ConfigValueFactory.fromAnyRef(suffix))
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
ConfigValueFactory.fromAnyRef(true))
+
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
+ ConfigValueFactory.fromAnyRef(System.currentTimeMillis()));
+
+ Path result = JarCachePathResolver.resolveJarCachePath(config, mockFs);
+
+ // Should resolve to fallback root + suffix
+ Assert.assertEquals(result.toString(), expectedFullPath);
+ }
+
+ @Test(expectedExceptions = IOException.class,
+ expectedExceptionsMessageRegExp = ".*No valid jar cache root directory
found.*")
+ public void testResolveJarCachePath_NoRootDirsConfigured() throws
IOException {
+ FileSystem mockFs = Mockito.mock(FileSystem.class);
+
+ Config config = ConfigFactory.empty()
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
ConfigValueFactory.fromAnyRef(true))
+
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
+ ConfigValueFactory.fromAnyRef(System.currentTimeMillis()));
+
+ // Should throw IOException when no root directories are configured
+ JarCachePathResolver.resolveJarCachePath(config, mockFs);
+ }
+
+ @Test
+ public void testResolveJarCachePath_DefaultSuffix() throws IOException {
+ FileSystem mockFs = Mockito.mock(FileSystem.class);
+ String rootDir = "/user/testuser";
+ // Note: Hadoop Path normalizes and removes trailing slashes
+ String expectedFullPath = "/user/testuser/.gobblinCache/gobblin-temporal";
+
+ // Mock: Root directory exists
+ Mockito.when(mockFs.exists(Mockito.any(Path.class))).thenAnswer(new
Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock invocation) {
+ Path path = invocation.getArgument(0);
+ return path.toString().equals(rootDir);
+ }
+ });
+
+ // Config without JAR_CACHE_SUFFIX - should use default
+ Config config = ConfigFactory.empty()
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR,
ConfigValueFactory.fromAnyRef(rootDir))
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
ConfigValueFactory.fromAnyRef(true))
+
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
+ ConfigValueFactory.fromAnyRef(System.currentTimeMillis()));
+
+ Path result = JarCachePathResolver.resolveJarCachePath(config, mockFs);
+
+ // Should use default suffix
+ Assert.assertEquals(result.toString(), expectedFullPath);
+ }
+
+ @Test
+ public void testResolveJarCachePath_EmptySuffixUsesDefault() throws
IOException {
+ FileSystem mockFs = Mockito.mock(FileSystem.class);
+ String rootDir = "/user/testuser";
+ // Note: Hadoop Path normalizes and removes trailing slashes
+ String expectedFullPath = "/user/testuser/.gobblinCache/gobblin-temporal";
+
+ // Mock: Root directory exists
+ Mockito.when(mockFs.exists(Mockito.any(Path.class))).thenAnswer(new
Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock invocation) {
+ Path path = invocation.getArgument(0);
+ return path.toString().equals(rootDir);
+ }
+ });
+
+ // Config with empty JAR_CACHE_SUFFIX - should use default
+ Config config = ConfigFactory.empty()
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR,
ConfigValueFactory.fromAnyRef(rootDir))
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX,
ConfigValueFactory.fromAnyRef(""))
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
ConfigValueFactory.fromAnyRef(true))
+
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
+ ConfigValueFactory.fromAnyRef(System.currentTimeMillis()));
+
+ Path result = JarCachePathResolver.resolveJarCachePath(config, mockFs);
+
+ // Should use default suffix when configured suffix is empty
+ Assert.assertEquals(result.toString(), expectedFullPath);
+ }
+
+ @Test
+ public void testResolveJarCachePath_SuffixNormalization() throws IOException
{
+ FileSystem mockFs = Mockito.mock(FileSystem.class);
+ String rootDir = "/user/testuser";
+ String suffixWithLeadingSlash =
"/.gobblinCache/gobblin-temporal/myproject";
+ String expectedFullPath =
"/user/testuser/.gobblinCache/gobblin-temporal/myproject";
+
+ // Mock: Root directory exists
+ Mockito.when(mockFs.exists(Mockito.any(Path.class))).thenAnswer(new
Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock invocation) {
+ Path path = invocation.getArgument(0);
+ return path.toString().equals(rootDir);
+ }
+ });
+
+ Config config = ConfigFactory.empty()
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR,
ConfigValueFactory.fromAnyRef(rootDir))
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX,
ConfigValueFactory.fromAnyRef(suffixWithLeadingSlash))
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
ConfigValueFactory.fromAnyRef(true))
+
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
+ ConfigValueFactory.fromAnyRef(System.currentTimeMillis()));
+
+ Path result = JarCachePathResolver.resolveJarCachePath(config, mockFs);
+
+ // Should normalize suffix by stripping leading '/' to avoid absolute path
issue
+ Assert.assertEquals(result.toString(), expectedFullPath);
+ }
+
+}
+
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java
index 1030e7b024..df37a0cf8a 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.junit.Assert;
+import org.mockito.Mockito;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
@@ -71,10 +72,12 @@ public class YarnHelixUtilsTest {
@Test
public void testGetJarCachePath() throws IOException {
+ FileSystem mockFs = Mockito.mock(FileSystem.class);
Config config = ConfigFactory.empty()
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
ConfigValueFactory.fromAnyRef(1726074000013L))
- .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR,
ConfigValueFactory.fromAnyRef("/tmp"));
- Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(config);
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR,
ConfigValueFactory.fromAnyRef("/tmp"))
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
ConfigValueFactory.fromAnyRef(true));
+ Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(config,
mockFs);
Assert.assertEquals(jarCachePath, new Path("/tmp/2024-09"));
}
@@ -84,8 +87,9 @@ public class YarnHelixUtilsTest {
FileSystem fs = FileSystem.getLocal(new Configuration());
Config config = ConfigFactory.empty()
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
ConfigValueFactory.fromAnyRef(1726074000013L))
- .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR,
ConfigValueFactory.fromAnyRef(this.tempDir + "/tmp"));
- Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(config);
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR,
ConfigValueFactory.fromAnyRef(this.tempDir + "/tmp"))
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
ConfigValueFactory.fromAnyRef(true));
+ Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(config,
fs);
fs.mkdirs(jarCachePath);
fs.mkdirs(new Path(jarCachePath.getParent(), "2024-08"));
fs.mkdirs(new Path(jarCachePath.getParent(), "2024-07"));