This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 773da768f [GOBBLIN-2050] Add settings to allow for full cleanup in
GobblinYarnAppLauncher (#3931)
773da768f is described below
commit 773da768f6c01b527a4dd74f4195e265c2eaf4df
Author: William Lo <[email protected]>
AuthorDate: Thu Apr 25 11:05:34 2024 -0400
[GOBBLIN-2050] Add settings to allow for full cleanup in
GobblinYarnAppLauncher (#3931)
Allow for explicit path definitions for token locations and work
directories to allow easy cleanup after job completion
---
.../cluster/GobblinClusterConfigurationKeys.java | 5 +++++
.../apache/gobblin/cluster/GobblinClusterUtils.java | 16 ++++++++++++++--
.../apache/gobblin/yarn/GobblinYarnAppLauncher.java | 11 ++++++-----
.../gobblin/yarn/GobblinYarnConfigurationKeys.java | 1 +
.../gobblin/yarn/YarnContainerSecurityManager.java | 21 ++++++++++++++++++---
5 files changed, 44 insertions(+), 10 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 68b65b687..23df5d7bc 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -50,6 +50,11 @@ public class GobblinClusterConfigurationKeys {
public static final boolean DEFAULT_STANDALONE_CLUSTER_MODE = false;
// Root working directory for Gobblin cluster
public static final String CLUSTER_WORK_DIR = GOBBLIN_CLUSTER_PREFIX +
"workDir";
+ // Root working dir without appending the application name, keeping
CLUSTER_WORK_DIR property for backward compatibility
+ // This is used in scenarios where we want to encapsulate multiple files
inside of this work dir without coupling it to the YARN application
+ // Example: Yarn security token refresh location, gobblin cluster worker
directories.
+ // However for concurrent jobs need to ensure that this property is distinct
for each job otherwise it can lead to folder conflicts and pre-emptive deletion
of files.
+ public static final String CLUSTER_EXACT_WORK_DIR = GOBBLIN_CLUSTER_PREFIX +
"exact.workDir";
public static final String DISTRIBUTED_JOB_LAUNCHER_ENABLED =
GOBBLIN_CLUSTER_PREFIX + "distributedJobLauncherEnabled";
public static final boolean DEFAULT_DISTRIBUTED_JOB_LAUNCHER_ENABLED = false;
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
index 205355b9a..c15d074d7 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
@@ -131,11 +131,14 @@ public class GobblinClusterUtils {
*/
public static Path getAppWorkDirPathFromConfig(Config config, FileSystem fs,
String applicationName, String applicationId) {
- if (config.hasPath(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR)) {
+ if
(config.hasPath(GobblinClusterConfigurationKeys.CLUSTER_EXACT_WORK_DIR)) {
+ return new Path(new Path(fs.getUri()),
config.getString(GobblinClusterConfigurationKeys.CLUSTER_EXACT_WORK_DIR));
+ } else if
(config.hasPath(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR)) {
return new Path(new Path(fs.getUri()),
PathUtils.combinePaths(config.getString(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR),
getAppWorkDirPath(applicationName, applicationId)));
+ } else {
+ return new Path(fs.getHomeDirectory(),
getAppWorkDirPath(applicationName, applicationId));
}
- return new Path(fs.getHomeDirectory(), getAppWorkDirPath(applicationName,
applicationId));
}
/**
@@ -254,4 +257,13 @@ public class GobblinClusterUtils {
.get(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), conf)
: FileSystem.get(conf);
}
+
+ public static FileSystem createFileSystem(Config config, Configuration conf)
throws IOException {
+ Config hadoopOverrides = ConfigUtils.getConfigOrEmpty(config,
GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX);
+ //Add any Hadoop-specific overrides into the Configuration object
+
JobConfigurationUtils.putPropertiesIntoConfiguration(ConfigUtils.configToProperties(hadoopOverrides),
conf);
+ return config.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem
+
.newInstance(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), conf)
+ : FileSystem.newInstance(conf);
+ }
}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 10ad44fea..c7dd6e85c 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -870,8 +870,7 @@ public class GobblinYarnAppLauncher {
* @throws IOException
*/
private AbstractTokenRefresher buildTokenRefreshManager() throws IOException
{
- Path tokenFilePath = new Path(this.fs.getHomeDirectory(),
this.applicationName + Path.SEPARATOR +
- GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
+ Path tokenFilePath =
YarnContainerSecurityManager.getYarnTokenFilePath(this.config, this.fs);
String securityManagerClassName = ConfigUtils.getString(config,
GobblinYarnConfigurationKeys.SECURITY_MANAGER_CLASS,
GobblinYarnConfigurationKeys.DEFAULT_SECURITY_MANAGER_CLASS);
try {
@@ -892,10 +891,12 @@ public class GobblinYarnAppLauncher {
@VisibleForTesting
void cleanUpAppWorkDirectory(ApplicationId applicationId) throws IOException
{
- Path appWorkDir =
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs,
this.applicationName, applicationId.toString());
- if (this.fs.exists(appWorkDir)) {
+ // Create a new filesystem as this.fs may have been closed by the Yarn
Application, and FS.get() will return a cached instance of the closed FS
+ FileSystem fs = GobblinClusterUtils.createFileSystem(this.config,
this.yarnConfiguration);
+ Path appWorkDir =
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, fs,
this.applicationName, applicationId.toString());
+ if (fs.exists(appWorkDir)) {
LOGGER.info("Deleting application working directory " + appWorkDir);
- this.fs.delete(appWorkDir, true);
+ fs.delete(appWorkDir, true);
}
}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
index 947daa55c..1a5bc96c8 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
@@ -113,6 +113,7 @@ public class GobblinYarnConfigurationKeys {
public static final String KEYTAB_FILE_PATH = GOBBLIN_YARN_PREFIX +
"keytab.file.path";
public static final String KEYTAB_PRINCIPAL_NAME = GOBBLIN_YARN_PREFIX +
"keytab.principal.name";
public static final String TOKEN_FILE_NAME = ".token";
+ public static final String TOKEN_FILE_PATH_KEY = GOBBLIN_YARN_PREFIX +
"token.file.path";
public static final String LOGIN_INTERVAL_IN_MINUTES = GOBBLIN_YARN_PREFIX +
"login.interval.minutes";
public static final Long DEFAULT_LOGIN_INTERVAL_IN_MINUTES = Long.MAX_VALUE;
public static final String TOKEN_RENEW_INTERVAL_IN_MINUTES =
GOBBLIN_YARN_PREFIX + "token.renew.interval.minutes";
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerSecurityManager.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerSecurityManager.java
index 3494daf0e..fa03390f3 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerSecurityManager.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerSecurityManager.java
@@ -24,6 +24,8 @@ import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
import java.io.IOException;
+
+import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.logs.LogCopier;
import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
import org.apache.hadoop.fs.FileSystem;
@@ -63,9 +65,7 @@ public class YarnContainerSecurityManager extends
AbstractIdleService {
public YarnContainerSecurityManager(Config config, FileSystem fs, EventBus
eventBus, LogCopier logCopier) {
this.fs = fs;
- this.tokenFilePath = new Path(this.fs.getHomeDirectory(),
- config.getString(GobblinYarnConfigurationKeys.APPLICATION_NAME_KEY) +
Path.SEPARATOR
- + GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
+ this.tokenFilePath = getYarnTokenFilePath(config, fs);
this.eventBus = eventBus;
this.logCopier = logCopier;
}
@@ -111,4 +111,19 @@ public class YarnContainerSecurityManager extends
AbstractIdleService {
}
UserGroupInformation.getCurrentUser().addCredentials(credentials);
}
+
+ /**
+ * A utility method to get the location of the generated security token
+ * @param config - the configuration that contains the application name and
the token file path
+ * @param fs - the Filesystem that stores the security token
+ * @return the path to the security token
+ */
+ static Path getYarnTokenFilePath(Config config, FileSystem fs) {
+ if (config.hasPath(GobblinYarnConfigurationKeys.TOKEN_FILE_PATH_KEY)) {
+ return new
Path(config.getString(GobblinYarnConfigurationKeys.TOKEN_FILE_PATH_KEY),
GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
+ }
+ // Default to storing the token file in the home directory of the user
+ return new Path(fs.getHomeDirectory(),
PathUtils.combinePaths(config.getString(GobblinYarnConfigurationKeys.APPLICATION_NAME_KEY),
+ GobblinYarnConfigurationKeys.TOKEN_FILE_NAME));
+ }
}