This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 773da768f [GOBBLIN-2050] Add settings to allow for full cleanup in 
GobblinYarnAppLauncher (#3931)
773da768f is described below

commit 773da768f6c01b527a4dd74f4195e265c2eaf4df
Author: William Lo <[email protected]>
AuthorDate: Thu Apr 25 11:05:34 2024 -0400

    [GOBBLIN-2050] Add settings to allow for full cleanup in 
GobblinYarnAppLauncher (#3931)
    
    Allow for explicit path definitions for token locations and work 
directories to allow easy cleanup after job completion
---
 .../cluster/GobblinClusterConfigurationKeys.java    |  5 +++++
 .../apache/gobblin/cluster/GobblinClusterUtils.java | 16 ++++++++++++++--
 .../apache/gobblin/yarn/GobblinYarnAppLauncher.java | 11 ++++++-----
 .../gobblin/yarn/GobblinYarnConfigurationKeys.java  |  1 +
 .../gobblin/yarn/YarnContainerSecurityManager.java  | 21 ++++++++++++++++++---
 5 files changed, 44 insertions(+), 10 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 68b65b687..23df5d7bc 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -50,6 +50,11 @@ public class GobblinClusterConfigurationKeys {
   public static final boolean DEFAULT_STANDALONE_CLUSTER_MODE = false;
   // Root working directory for Gobblin cluster
   public static final String CLUSTER_WORK_DIR = GOBBLIN_CLUSTER_PREFIX + 
"workDir";
+  // Root working dir without appending the application name, keeping 
CLUSTER_WORK_DIR property for backward compatibility
+  // This is used in scenarios where we want to encapsulate multiple files 
inside of this work dir without coupling it to the YARN application
+  // Example: Yarn security token refresh location, gobblin cluster worker 
directories.
+  // However for concurrent jobs need to ensure that this property is distinct 
for each job otherwise it can lead to folder conflicts and pre-emptive deletion 
of files.
+  public static final String CLUSTER_EXACT_WORK_DIR = GOBBLIN_CLUSTER_PREFIX + 
"exact.workDir";
 
   public static final String DISTRIBUTED_JOB_LAUNCHER_ENABLED = 
GOBBLIN_CLUSTER_PREFIX + "distributedJobLauncherEnabled";
   public static final boolean DEFAULT_DISTRIBUTED_JOB_LAUNCHER_ENABLED = false;
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
index 205355b9a..c15d074d7 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
@@ -131,11 +131,14 @@ public class GobblinClusterUtils {
    */
   public static Path getAppWorkDirPathFromConfig(Config config, FileSystem fs,
       String applicationName, String applicationId) {
-    if (config.hasPath(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR)) {
+    if 
(config.hasPath(GobblinClusterConfigurationKeys.CLUSTER_EXACT_WORK_DIR)) {
+      return new Path(new Path(fs.getUri()), 
config.getString(GobblinClusterConfigurationKeys.CLUSTER_EXACT_WORK_DIR));
+    } else if 
(config.hasPath(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR)) {
       return new Path(new Path(fs.getUri()), 
PathUtils.combinePaths(config.getString(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR),
           getAppWorkDirPath(applicationName, applicationId)));
+    } else {
+      return new Path(fs.getHomeDirectory(), 
getAppWorkDirPath(applicationName, applicationId));
     }
-    return new Path(fs.getHomeDirectory(), getAppWorkDirPath(applicationName, 
applicationId));
   }
 
   /**
@@ -254,4 +257,13 @@ public class GobblinClusterUtils {
         .get(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), conf)
         : FileSystem.get(conf);
   }
+
+  public static FileSystem createFileSystem(Config config, Configuration conf) 
throws IOException {
+    Config hadoopOverrides = ConfigUtils.getConfigOrEmpty(config, 
GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX);
+    //Add any Hadoop-specific overrides into the Configuration object
+    
JobConfigurationUtils.putPropertiesIntoConfiguration(ConfigUtils.configToProperties(hadoopOverrides),
 conf);
+    return config.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem
+        
.newInstance(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), conf)
+        : FileSystem.newInstance(conf);
+  }
 }
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 10ad44fea..c7dd6e85c 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -870,8 +870,7 @@ public class GobblinYarnAppLauncher {
    * @throws IOException
    */
   private AbstractTokenRefresher buildTokenRefreshManager() throws IOException 
{
-    Path tokenFilePath = new Path(this.fs.getHomeDirectory(), 
this.applicationName + Path.SEPARATOR +
-        GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
+    Path tokenFilePath = 
YarnContainerSecurityManager.getYarnTokenFilePath(this.config, this.fs);
     String securityManagerClassName = ConfigUtils.getString(config, 
GobblinYarnConfigurationKeys.SECURITY_MANAGER_CLASS, 
GobblinYarnConfigurationKeys.DEFAULT_SECURITY_MANAGER_CLASS);
 
     try {
@@ -892,10 +891,12 @@ public class GobblinYarnAppLauncher {
 
   @VisibleForTesting
   void cleanUpAppWorkDirectory(ApplicationId applicationId) throws IOException 
{
-    Path appWorkDir = 
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, 
this.applicationName, applicationId.toString());
-    if (this.fs.exists(appWorkDir)) {
+    // Create a new filesystem as this.fs may have been closed by the Yarn 
Application, and FS.get() will return a cached instance of the closed FS
+    FileSystem fs = GobblinClusterUtils.createFileSystem(this.config, 
this.yarnConfiguration);
+    Path appWorkDir = 
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, fs, 
this.applicationName, applicationId.toString());
+    if (fs.exists(appWorkDir)) {
       LOGGER.info("Deleting application working directory " + appWorkDir);
-      this.fs.delete(appWorkDir, true);
+      fs.delete(appWorkDir, true);
     }
   }
 
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
index 947daa55c..1a5bc96c8 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
@@ -113,6 +113,7 @@ public class GobblinYarnConfigurationKeys {
   public static final String KEYTAB_FILE_PATH = GOBBLIN_YARN_PREFIX + 
"keytab.file.path";
   public static final String KEYTAB_PRINCIPAL_NAME = GOBBLIN_YARN_PREFIX + 
"keytab.principal.name";
   public static final String TOKEN_FILE_NAME = ".token";
+  public static final String TOKEN_FILE_PATH_KEY = GOBBLIN_YARN_PREFIX + 
"token.file.path";
   public static final String LOGIN_INTERVAL_IN_MINUTES = GOBBLIN_YARN_PREFIX + 
"login.interval.minutes";
   public static final Long DEFAULT_LOGIN_INTERVAL_IN_MINUTES = Long.MAX_VALUE;
   public static final String TOKEN_RENEW_INTERVAL_IN_MINUTES = 
GOBBLIN_YARN_PREFIX + "token.renew.interval.minutes";
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerSecurityManager.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerSecurityManager.java
index 3494daf0e..fa03390f3 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerSecurityManager.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerSecurityManager.java
@@ -24,6 +24,8 @@ import com.google.common.eventbus.Subscribe;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.typesafe.config.Config;
 import java.io.IOException;
+
+import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.logs.LogCopier;
 import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
 import org.apache.hadoop.fs.FileSystem;
@@ -63,9 +65,7 @@ public class YarnContainerSecurityManager extends 
AbstractIdleService {
 
   public YarnContainerSecurityManager(Config config, FileSystem fs, EventBus 
eventBus, LogCopier logCopier) {
     this.fs = fs;
-    this.tokenFilePath = new Path(this.fs.getHomeDirectory(),
-        config.getString(GobblinYarnConfigurationKeys.APPLICATION_NAME_KEY) + 
Path.SEPARATOR
-            + GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
+    this.tokenFilePath = getYarnTokenFilePath(config, fs);
     this.eventBus = eventBus;
     this.logCopier = logCopier;
   }
@@ -111,4 +111,19 @@ public class YarnContainerSecurityManager extends 
AbstractIdleService {
     }
     UserGroupInformation.getCurrentUser().addCredentials(credentials);
   }
+
+  /**
+   * A utility method to get the location of the generated security token
+   * @param config - the configuration that contains the application name and 
the token file path
+   * @param fs - the Filesystem that stores the security token
+   * @return the path to the security token
+   */
+  static Path getYarnTokenFilePath(Config config, FileSystem fs) {
+    if (config.hasPath(GobblinYarnConfigurationKeys.TOKEN_FILE_PATH_KEY)) {
+      return new 
Path(config.getString(GobblinYarnConfigurationKeys.TOKEN_FILE_PATH_KEY), 
GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
+    }
+    // Default to storing the token file in the home directory of the user
+    return new Path(fs.getHomeDirectory(), 
PathUtils.combinePaths(config.getString(GobblinYarnConfigurationKeys.APPLICATION_NAME_KEY),
+        GobblinYarnConfigurationKeys.TOKEN_FILE_NAME));
+  }
 }

Reply via email to