This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a07a4f190d [GOBBLIN-2135] Cache Gobblin YARN application jars (#4030)
a07a4f190d is described below
commit a07a4f190d7128f48ba3d956efd4c1b9cf9158f4
Author: William Lo <[email protected]>
AuthorDate: Tue Sep 17 17:15:03 2024 -0400
[GOBBLIN-2135] Cache Gobblin YARN application jars (#4030)
* Implement caching for gobblin yarn app launcher so that jobs do not
repeatedly upload jars and files to hdfs
---
.../gobblin/runtime/mapreduce/MRJobLauncher.java | 45 +-------
.../apache/gobblin/temporal/yarn/YarnService.java | 28 ++++-
.../util/filesystem/HdfsJarUploadUtils.java | 85 +++++++++++++++
.../gobblin/yarn/GobblinYarnAppLauncher.java | 117 +++++++++++++--------
.../gobblin/yarn/GobblinYarnConfigurationKeys.java | 9 ++
.../org/apache/gobblin/yarn/YarnHelixUtils.java | 38 +++++++
.../apache/gobblin/yarn/YarnHelixUtilsTest.java | 48 +++++++++
7 files changed, 279 insertions(+), 91 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index 7b01dc1a6b..b5e7f09740 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -114,6 +114,7 @@ import org.apache.gobblin.util.JobConfigurationUtils;
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.util.SerializationUtils;
+import org.apache.gobblin.util.filesystem.HdfsJarUploadUtils;
import org.apache.gobblin.util.reflection.RestrictedFieldAccessingUtils;
/**
* An implementation of {@link JobLauncher} that launches a Gobblin job as a
Hadoop MR job.
@@ -154,8 +155,6 @@ public class MRJobLauncher extends AbstractJobLauncher {
// Configuration that make uploading of jar files more reliable,
// since multiple Gobblin Jobs are sharing the same jar directory.
private static final int MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT = 5;
- private static final int WAITING_TIME_ON_IMCOMPLETE_UPLOAD = 3000;
-
public static final String MR_TYPE_KEY =
ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX + "mr.type";
public static final String MAPPER_TASK_NUM_KEY =
ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX + "reporting.mapper.task.num";
public static final String MAPPER_TASK_ATTEMPT_NUM_KEY =
ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX +
"reporting.mapper.task.attempt.num";
@@ -572,56 +571,22 @@ public class MRJobLauncher extends AbstractJobLauncher {
for (String jarFile : SPLITTER.split(jarFileList)) {
Path srcJarFile = new Path(jarFile);
FileStatus[] fileStatusList = lfs.globStatus(srcJarFile);
-
for (FileStatus status : fileStatusList) {
+ Path destJarFile = HdfsJarUploadUtils.calculateDestJarFilePath(fs,
status.getPath().getName(), this.unsharedJarsDir, jarFileDir);
// For each FileStatus there are chances it could fail in copying at
the first attempt, due to file-existence
// or file-copy is ongoing by other job instance since all Gobblin
jobs share the same jar file directory.
// the retryCount is to avoid cases (if any) where retry is going too
far and causes job hanging.
- int retryCount = 0;
- boolean shouldFileBeAddedIntoDC = true;
- Path destJarFile = calculateDestJarFile(status, jarFileDir);
- // Adding destJarFile into HDFS until it exists and the size of file
on targetPath matches the one on local path.
- while (!this.fs.exists(destJarFile) ||
fs.getFileStatus(destJarFile).getLen() != status.getLen()) {
- try {
- if (this.fs.exists(destJarFile) &&
fs.getFileStatus(destJarFile).getLen() != status.getLen()) {
- Thread.sleep(WAITING_TIME_ON_IMCOMPLETE_UPLOAD);
- throw new IOException("Waiting for file to complete on uploading
... ");
- }
- // Set the first parameter as false for not deleting sourceFile
- // Set the second parameter as false for not overwriting existing
file on the target, by default it is true.
- // If the file is preExisted but overwrite flag set to false, then
an IOException if thrown.
- this.fs.copyFromLocalFile(false, false, status.getPath(),
destJarFile);
- } catch (IOException | InterruptedException e) {
- LOG.warn("Path:" + destJarFile + " is not copied successfully.
Will require retry.");
- retryCount += 1;
- if (retryCount >= this.jarFileMaximumRetry) {
- LOG.error("The jar file:" + destJarFile + "failed in being
copied into hdfs", e);
- // If retry reaches upper limit, skip copying this file.
- shouldFileBeAddedIntoDC = false;
- break;
- }
- }
- }
- if (shouldFileBeAddedIntoDC) {
+ if (HdfsJarUploadUtils.uploadJarToHdfs(this.fs, status,
this.jarFileMaximumRetry, destJarFile)) {
// Then add the jar file on HDFS to the classpath
LOG.info(String.format("Adding %s to classpath", destJarFile));
DistributedCache.addFileToClassPath(destJarFile, conf, this.fs);
+ } else {
+ LOG.error("Failed to upload jar file: " + status.getPath());
}
}
}
}
- /**
- * Calculate the target filePath of the jar file to be copied on HDFS,
- * given the {@link FileStatus} of a jarFile and the path of directory that
contains jar.
- */
- private Path calculateDestJarFile(FileStatus status, Path jarFileDir) {
- // SNAPSHOT jars should not be shared, as different jobs may be using
different versions of it
- Path baseDir = status.getPath().getName().contains("SNAPSHOT") ?
this.unsharedJarsDir : jarFileDir;
- // DistributedCache requires absolute path, so we need to use
makeQualified.
- return new Path(this.fs.makeQualified(baseDir),
status.getPath().getName());
- }
-
/**
* Add local non-jar files the job depends on to DistributedCache.
*/
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
index b124063bcd..426521b921 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
@@ -197,6 +197,8 @@ class YarnService extends AbstractIdleService {
private volatile boolean shutdownInProgress = false;
+ private final boolean jarCacheEnabled;
+
public YarnService(Config config, String applicationName, String
applicationId, YarnConfiguration yarnConfiguration,
FileSystem fs, EventBus eventBus) throws Exception {
this.applicationName = applicationName;
@@ -270,6 +272,7 @@ class YarnService extends AbstractIdleService {
GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL);
this.containerTimezone = ConfigUtils.getString(this.config,
GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE,
GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE);
+ this.jarCacheEnabled = ConfigUtils.getBoolean(this.config,
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT);
}
@SuppressWarnings("unused")
@@ -484,12 +487,30 @@ class YarnService extends AbstractIdleService {
protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo
containerInfo)
throws IOException {
Path appWorkDir =
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs,
this.applicationName, this.applicationId);
+ // Used for -SNAPSHOT versions of jars
+ Path containerJarsUnsharedDir = new Path(appWorkDir,
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
+ Path jarCacheDir = this.jarCacheEnabled ?
YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir;
+ Path containerJarsCachedDir = new Path(jarCacheDir,
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
+ LOGGER.info("Container cached jars root dir: " + containerJarsCachedDir);
+ LOGGER.info("Container execution-private jars root dir: " +
containerJarsUnsharedDir);
Path containerWorkDir = new Path(appWorkDir,
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
- Map<String, LocalResource> resourceMap = Maps.newHashMap();
+ Map<String, LocalResource> resourceMap = Maps.newHashMap();
+ // Always fetch any jars from the appWorkDir for any potential snapshot
jars
addContainerLocalResources(new Path(appWorkDir,
GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME), resourceMap);
- addContainerLocalResources(new Path(containerWorkDir,
GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME), resourceMap);
+ if (this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JARS_KEY)) {
+ addContainerLocalResources(new Path(containerJarsUnsharedDir,
GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME),
+ resourceMap);
+ }
+ if (this.jarCacheEnabled) {
+ addContainerLocalResources(new Path(jarCacheDir,
GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME), resourceMap);
+ if
(this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JARS_KEY)) {
+ addContainerLocalResources(new Path(containerJarsCachedDir,
GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME),
+ resourceMap);
+ }
+ }
+
addContainerLocalResources(
new Path(containerWorkDir,
GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME), resourceMap);
@@ -579,8 +600,6 @@ class YarnService extends AbstractIdleService {
containerCommand.append("
--").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME)
.append(" ").append(helixInstanceTag);
}
-
- LOGGER.info("Building " + containerProcessName);
return containerCommand.append("
1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
containerProcessName).append(".").append(ApplicationConstants.STDOUT)
.append("
2>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
@@ -797,7 +816,6 @@ class YarnService extends AbstractIdleService {
public void run() {
try {
LOGGER.info("Starting container " + containerId);
-
nmClientAsync.startContainerAsync(container,
newContainerLaunchContext(containerInfo));
} catch (IOException ioe) {
LOGGER.error("Failed to start container " + containerId, ioe);
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/HdfsJarUploadUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/HdfsJarUploadUtils.java
new file mode 100644
index 0000000000..31128a1cf1
--- /dev/null
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/HdfsJarUploadUtils.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.filesystem;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Utility class for uploading jar files to HDFS with retries to handle
concurrency
+ */
+@Slf4j
+public class HdfsJarUploadUtils {
+
+ private static final long WAITING_TIME_ON_INCOMPLETE_UPLOAD_MILLIS = 3000;
+
+ /**
+ * Calculate the target filePath of the jar file to be copied on HDFS,
+ * given the {@link FileStatus} of a jarFile and the path of directory that
contains jar.
+ * Snapshot dirs should not be shared, as different jobs may be using
different versions of it.
+ * @param fs
+ * @param jarName
+ * @param unsharedJarsDir
+ * @param jarCacheDir
+ * @return
+ * @throws IOException
+ */
+ public static Path calculateDestJarFilePath(FileSystem fs, String jarName,
Path unsharedJarsDir, Path jarCacheDir) throws IOException {
+ Path uploadDir = jarName.contains("SNAPSHOT") ? unsharedJarsDir :
jarCacheDir;
+ Path destJarFile = new Path(fs.makeQualified(uploadDir), jarName);
+ return destJarFile;
+ }
+ /**
+ * Upload a jar file to HDFS with retries to handle already existing jars
+ * @param fs
+ * @param localJar
+ * @param destJarFile
+ * @param maxAttempts
+ * @return
+ * @throws IOException
+ */
+ public static boolean uploadJarToHdfs(FileSystem fs, FileStatus localJar,
int maxAttempts, Path destJarFile) throws IOException {
+ int retryCount = 0;
+ while (!fs.exists(destJarFile) || fs.getFileStatus(destJarFile).getLen()
!= localJar.getLen()) {
+ try {
+ if (fs.exists(destJarFile) && fs.getFileStatus(destJarFile).getLen()
!= localJar.getLen()) {
+ Thread.sleep(WAITING_TIME_ON_INCOMPLETE_UPLOAD_MILLIS);
+ throw new IOException("Waiting for file to complete on uploading ...
");
+ }
+ boolean deleteSourceFile = false;
+ boolean overwriteAnyExistingDestFile = false; // IOException will be
thrown if does already exist
+ fs.copyFromLocalFile(deleteSourceFile, overwriteAnyExistingDestFile,
localJar.getPath(), destJarFile);
+ } catch (IOException | InterruptedException e) {
+ log.warn("Path:" + destJarFile + " is not copied successfully. Will
require retry.");
+ retryCount += 1;
+ if (retryCount >= maxAttempts) {
+ log.error("The jar file:" + destJarFile + "failed in being copied
into hdfs", e);
+ // If retry reaches upper limit, skip copying this file.
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index c7dd6e85c1..4ce40f21df 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -108,6 +108,7 @@ import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.EmailUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.filesystem.HdfsJarUploadUtils;
import org.apache.gobblin.util.hadoop.TokenUtils;
import org.apache.gobblin.util.io.StreamUtils;
import org.apache.gobblin.util.logs.LogCopier;
@@ -162,7 +163,7 @@ public class GobblinYarnAppLauncher {
public static final String GOBBLIN_YARN_CONFIG_OUTPUT_PATH =
"gobblin.yarn.configOutputPath";
//Configuration key to signal the GobblinYarnAppLauncher mode
- public static final String GOBBLIN_YARN_APP_LAUNCHER_MODE =
"gobblin.yarn.appLauncherMode";
+ public static final String GOBBLIN_YARN_APP_LAUNCHER_MODE =
"gobblin.yarn.appLauncherMode";
public static final String DEFAULT_GOBBLIN_YARN_APP_LAUNCHER_MODE = "";
public static final String AZKABAN_APP_LAUNCHER_MODE_KEY = "azkaban";
@@ -238,9 +239,13 @@ public class GobblinYarnAppLauncher {
protected final String originalYarnRMAddress;
protected final Map<String, YarnClient> potentialYarnClients = new
HashMap<>();
private YarnClient yarnClient;
+ private static final int MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT = 5;
+
+ private final boolean jarCacheEnabled;
public GobblinYarnAppLauncher(Config config, YarnConfiguration
yarnConfiguration) throws IOException {
- this.config = config;
+ this.config =
config.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
+ ConfigValueFactory.fromAnyRef(System.currentTimeMillis()));
this.applicationName =
config.getString(GobblinYarnConfigurationKeys.APPLICATION_NAME_KEY);
this.appQueueName =
config.getString(GobblinYarnConfigurationKeys.APP_QUEUE_KEY);
@@ -254,7 +259,6 @@ public class GobblinYarnAppLauncher {
this.fs = GobblinClusterUtils.buildFileSystem(config,
this.yarnConfiguration);
this.closer.register(this.fs);
-
boolean isHelixEnabled = ConfigUtils.getBoolean(config,
GobblinYarnConfigurationKeys.HELIX_ENABLED,
GobblinYarnConfigurationKeys.DEFAULT_HELIX_ENABLED);
this.helixClusterLifecycleManager = isHelixEnabled
@@ -300,10 +304,10 @@ public class GobblinYarnAppLauncher {
GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE);
this.detachOnExitEnabled = ConfigUtils
- .getBoolean(config,
GobblinYarnConfigurationKeys.GOBBLIN_YARN_DETACH_ON_EXIT_ENABLED,
+ .getBoolean(this.config,
GobblinYarnConfigurationKeys.GOBBLIN_YARN_DETACH_ON_EXIT_ENABLED,
GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_DETACH_ON_EXIT);
- this.appLauncherMode = ConfigUtils.getString(config,
GOBBLIN_YARN_APP_LAUNCHER_MODE, DEFAULT_GOBBLIN_YARN_APP_LAUNCHER_MODE);
-
+ this.appLauncherMode = ConfigUtils.getString(this.config,
GOBBLIN_YARN_APP_LAUNCHER_MODE, DEFAULT_GOBBLIN_YARN_APP_LAUNCHER_MODE);
+ this.jarCacheEnabled = ConfigUtils.getBoolean(this.config,
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT);
try {
config = addDynamicConfig(config);
@@ -334,7 +338,7 @@ public class GobblinYarnAppLauncher {
this.eventBus.register(this);
//Before connect with yarn client, we need to login to get the token
- if(ConfigUtils.getBoolean(config,
GobblinYarnConfigurationKeys.ENABLE_KEY_MANAGEMENT, false)) {
+ if (ConfigUtils.getBoolean(config,
GobblinYarnConfigurationKeys.ENABLE_KEY_MANAGEMENT, false)) {
this.tokenRefreshManager = Optional.of(buildTokenRefreshManager());
this.tokenRefreshManager.get().loginAndScheduleTokenRenewal();
}
@@ -370,7 +374,7 @@ public class GobblinYarnAppLauncher {
return this.applicationId.isPresent() && !this.applicationCompleted &&
!this.detachOnExitEnabled;
}
- private void addServices() throws IOException{
+ private void addServices() throws IOException {
List<Service> services = Lists.newArrayList();
if (this.tokenRefreshManager.isPresent()) {
LOGGER.info("Adding KeyManagerService since key management is enabled");
@@ -396,7 +400,7 @@ public class GobblinYarnAppLauncher {
LOGGER.warn("NOT starting the admin UI because the job execution info
server is NOT enabled");
}
- if (services.size() > 0 ) {
+ if (services.size() > 0) {
this.serviceManager = Optional.of(new ServiceManager(services));
this.serviceManager.get().startAsync();
} else {
@@ -428,7 +432,6 @@ public class GobblinYarnAppLauncher {
if (!this.detachOnExitEnabled) {
LOGGER.info("Disabling all live Helix instances..");
}
-
} finally {
try {
if (this.applicationId.isPresent() && !this.detachOnExitEnabled) {
@@ -561,7 +564,7 @@ public class GobblinYarnAppLauncher {
/**
* Setup and submit the Gobblin Yarn application.
- *
+ * Retain at least the current and last month's jars in the cache (if
configured) to handle executions running for ~30 days max
* @throws IOException if there's anything wrong setting up and submitting
the Yarn application
* @throws YarnException if there's anything wrong setting up and submitting
the Yarn application
*/
@@ -573,7 +576,7 @@ public class GobblinYarnAppLauncher {
appSubmissionContext.setApplicationType(GOBBLIN_YARN_APPLICATION_TYPE);
appSubmissionContext.setMaxAppAttempts(ConfigUtils.getInt(config,
GobblinYarnConfigurationKeys.APP_MASTER_MAX_ATTEMPTS_KEY,
GobblinYarnConfigurationKeys.DEFAULT_APP_MASTER_MAX_ATTEMPTS_KEY));
ApplicationId applicationId = appSubmissionContext.getApplicationId();
- LOGGER.info("created new yarn application: "+ applicationId.getId());
+ LOGGER.info("created new yarn application: " + applicationId.getId());
GetNewApplicationResponse newApplicationResponse =
gobblinYarnApp.getNewApplicationResponse();
// Set up resource type requirements for ApplicationMaster
@@ -587,6 +590,15 @@ public class GobblinYarnAppLauncher {
amContainerLaunchContext.setEnvironment(YarnHelixUtils.getEnvironmentVariables(this.yarnConfiguration));
amContainerLaunchContext.setCommands(Lists.newArrayList(buildApplicationMasterCommand(applicationId.toString(),
resource.getMemory())));
+ if (this.jarCacheEnabled) {
+ Path jarCachePath =
YarnHelixUtils.calculatePerMonthJarCachePath(this.config);
+ // Retain at least the current and last month's jars to handle
executions running for ~30 days max
+ boolean cleanedSuccessfully =
YarnHelixUtils.retainKLatestJarCachePaths(jarCachePath.getParent(), 2, this.fs);
+ if (!cleanedSuccessfully) {
+ LOGGER.warn("Failed to delete older jar cache directories");
+ }
+ }
+
Map<ApplicationAccessType, String> acls = new HashMap<>(1);
acls.put(ApplicationAccessType.VIEW_APP, this.appViewAcl);
amContainerLaunchContext.setApplicationACLs(acls);
@@ -640,31 +652,36 @@ public class GobblinYarnAppLauncher {
private Map<String, LocalResource> addAppMasterLocalResources(ApplicationId
applicationId) throws IOException {
Path appWorkDir =
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs,
this.applicationName, applicationId.toString());
+ Path jarsRootDir = this.jarCacheEnabled ?
YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir;
Path appMasterWorkDir = new Path(appWorkDir,
GobblinYarnConfigurationKeys.APP_MASTER_WORK_DIR_NAME);
- LOGGER.info("Configured GobblinApplicationMaster work directory to: {}",
appMasterWorkDir.toString());
+ Path appMasterJarsCacheDir = new Path(jarsRootDir,
GobblinYarnConfigurationKeys.APP_MASTER_WORK_DIR_NAME);
+ LOGGER.info("Configured GobblinApplicationMaster work directory to: {}",
appMasterWorkDir);
+ LOGGER.info("Configured GobblinApplicationMaster jars directory to: {}",
appMasterJarsCacheDir);
Map<String, LocalResource> appMasterResources = Maps.newHashMap();
FileSystem localFs = FileSystem.getLocal(new Configuration());
- // NOTE: log after each step below for insight into what takes bulk of time
if (this.config.hasPath(GobblinYarnConfigurationKeys.LIB_JARS_DIR_KEY)) {
- Path libJarsDestDir = new Path(appWorkDir,
GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME);
+ // Lib jars are shared between all containers, store at the root level
+ Path libJarsDestDir = new Path(jarsRootDir,
GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME);
+ Path unsharedJarsDestDir = new Path(appWorkDir,
GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME);
addLibJars(new
Path(this.config.getString(GobblinYarnConfigurationKeys.LIB_JARS_DIR_KEY)),
- Optional.of(appMasterResources), libJarsDestDir, localFs);
- LOGGER.info("Added lib jars to directory: {}",
libJarsDestDir.toString());
+ Optional.of(appMasterResources), libJarsDestDir,
unsharedJarsDestDir, localFs);
+ LOGGER.info("Added lib jars to directory: {} and execution-private
directory: {}", libJarsDestDir, unsharedJarsDestDir);
}
if (this.config.hasPath(GobblinYarnConfigurationKeys.APP_MASTER_JARS_KEY))
{
- Path appJarsDestDir = new Path(appMasterWorkDir,
GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME);
+ Path appJarsDestDir = new Path(appMasterJarsCacheDir,
GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME);
+ Path unsharedJarsDestDir = new Path(appMasterWorkDir,
GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME);
addAppJars(this.config.getString(GobblinYarnConfigurationKeys.APP_MASTER_JARS_KEY),
- Optional.of(appMasterResources), appJarsDestDir, localFs);
- LOGGER.info("Added app jars to directory: {}",
appJarsDestDir.toString());
+ Optional.of(appMasterResources), appJarsDestDir,
unsharedJarsDestDir, localFs);
+ LOGGER.info("Added app jars to directory: {} and execution-private
directory: {}", appJarsDestDir, unsharedJarsDestDir);
}
if
(this.config.hasPath(GobblinYarnConfigurationKeys.APP_MASTER_FILES_LOCAL_KEY)) {
- Path appFilesDestDir = new Path(appMasterWorkDir,
GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME);
+ Path appFilesDestDir = new Path(appWorkDir,
GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME);
addAppLocalFiles(this.config.getString(GobblinYarnConfigurationKeys.APP_MASTER_FILES_LOCAL_KEY),
Optional.of(appMasterResources), appFilesDestDir, localFs);
- LOGGER.info("Added app local files to directory: {}",
appFilesDestDir.toString());
+ LOGGER.info("Added app local files to directory: {}", appFilesDestDir);
}
if
(this.config.hasPath(GobblinYarnConfigurationKeys.APP_MASTER_FILES_REMOTE_KEY))
{
YarnHelixUtils.addRemoteFilesToLocalResources(this.config.getString(GobblinYarnConfigurationKeys.APP_MASTER_FILES_REMOTE_KEY),
@@ -677,27 +694,30 @@ public class GobblinYarnAppLauncher {
LOGGER.info("Added remote zips to local resources");
}
if
(this.config.hasPath(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY)) {
- Path appFilesDestDir = new Path(appMasterWorkDir,
GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME);
+ Path appFilesDestDir = new Path(appWorkDir,
GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME);
addJobConfPackage(this.config.getString(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY),
appFilesDestDir,
appMasterResources);
- LOGGER.info("Added job conf package to directory: {}",
appFilesDestDir.toString());
+ LOGGER.info("Added job conf package to directory: {}", appFilesDestDir);
}
return appMasterResources;
}
private void addContainerLocalResources(ApplicationId applicationId) throws
IOException {
- Path appWorkDir =
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs,
this.applicationName, applicationId.toString());
-
+ Path appWorkDir =
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs,
this.applicationName,
+ applicationId.toString());
+ Path jarsRootDir = this.jarCacheEnabled ?
YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir;
Path containerWorkDir = new Path(appWorkDir,
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
- LOGGER.info("Configured Container work directory to: {}",
containerWorkDir.toString());
-
+ Path containerJarsRootDir = new Path(jarsRootDir,
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
+ LOGGER.info("Configured Container work directory to: {}",
containerWorkDir);
+ LOGGER.info("Configured Container jars directory to: {}",
containerJarsRootDir);
FileSystem localFs = FileSystem.getLocal(new Configuration());
if (this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JARS_KEY)) {
- Path appJarsDestDir = new Path(containerWorkDir,
GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME);
+ Path appJarsDestDir = new Path(containerJarsRootDir,
GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME);
+ Path unsharedJarsDestDir = new Path(containerWorkDir,
GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME);
addAppJars(this.config.getString(GobblinYarnConfigurationKeys.CONTAINER_JARS_KEY),
- Optional.<Map<String, LocalResource>>absent(), appJarsDestDir,
localFs);
+ Optional.<Map<String, LocalResource>>absent(), appJarsDestDir,
unsharedJarsDestDir, localFs);
}
if
(this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_FILES_LOCAL_KEY)) {
Path appFilesDestDir = new Path(containerWorkDir,
GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME);
@@ -706,7 +726,7 @@ public class GobblinYarnAppLauncher {
}
}
- private void addLibJars(Path srcLibJarDir, Optional<Map<String,
LocalResource>> resourceMap, Path destDir,
+ private void addLibJars(Path srcLibJarDir, Optional<Map<String,
LocalResource>> resourceMap, Path destCacheDir, Path unsharedDir,
FileSystem localFs) throws IOException {
// Missing classpath-jars will be a fatal error.
@@ -720,26 +740,26 @@ public class GobblinYarnAppLauncher {
}
for (FileStatus libJarFile : libJarFiles) {
- Path destFilePath = new Path(destDir, libJarFile.getPath().getName());
- this.fs.copyFromLocalFile(libJarFile.getPath(), destFilePath);
- if (resourceMap.isPresent()) {
+ Path destFilePath = HdfsJarUploadUtils.calculateDestJarFilePath(this.fs,
libJarFile.getPath().getName(), unsharedDir, destCacheDir);
+ if (HdfsJarUploadUtils.uploadJarToHdfs(fs, libJarFile,
MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath) && resourceMap.isPresent())
{
YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath,
LocalResourceType.FILE, resourceMap.get());
+ } else {
+ LOGGER.warn("Failed to upload jar file {} to HDFS",
libJarFile.getPath());
}
}
}
-
- private void addAppJars(String jarFilePathList, Optional<Map<String,
LocalResource>> resourceMap,
- Path destDir, FileSystem localFs) throws IOException {
+ private void addAppJars(String jarFilePathList, Optional<Map<String,
LocalResource>> resourceMap, Path destCacheDir, Path unsharedDir,
+ FileSystem localFs) throws IOException {
for (String jarFilePath : SPLITTER.split(jarFilePathList)) {
Path srcFilePath = new Path(jarFilePath);
- Path destFilePath = new Path(destDir, srcFilePath.getName());
- if (localFs.exists(srcFilePath)) {
- this.fs.copyFromLocalFile(srcFilePath, destFilePath);
+ FileStatus localJar = localFs.getFileStatus(srcFilePath);
+ Path destFilePath = HdfsJarUploadUtils.calculateDestJarFilePath(this.fs,
localJar.getPath().getName(), unsharedDir, destCacheDir);
+ if (HdfsJarUploadUtils.uploadJarToHdfs(fs, localJar,
MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath)) {
+ if (resourceMap.isPresent()) {
+ YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath,
LocalResourceType.FILE, resourceMap.get());
+ }
} else {
- LOGGER.warn("The src destination " + srcFilePath + " doesn't exists");
- }
- if (resourceMap.isPresent()) {
- YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath,
LocalResourceType.FILE, resourceMap.get());
+ LOGGER.warn("Failed to upload jar file {} to HDFS", srcFilePath);
}
}
}
@@ -751,7 +771,11 @@ public class GobblinYarnAppLauncher {
Path srcFilePath = new Path(localFilePath);
Path destFilePath = new Path(destDir, srcFilePath.getName());
if (localFs.exists(srcFilePath)) {
- this.fs.copyFromLocalFile(srcFilePath, destFilePath);
+ if (this.fs.exists(destFilePath)) {
+ LOGGER.info("The destination app jar {} already exists, skipping
upload", destFilePath);
+ } else {
+ this.fs.copyFromLocalFile(srcFilePath, destFilePath);
+ }
if (resourceMap.isPresent()) {
YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath,
LocalResourceType.FILE, resourceMap.get());
}
@@ -789,6 +813,7 @@ public class GobblinYarnAppLauncher {
.append("
-D").append(GobblinYarnConfigurationKeys.JVM_USER_TIMEZONE_CONFIG).append("=").append(this.containerTimezone)
.append("
-D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_DIR_NAME).append("=").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR)
.append("
-D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_FILE_NAME).append("=").append(logFileName).append(".").append(ApplicationConstants.STDOUT)
+ .append("
-D").append(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY).append("=").append(config.getString(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY))
.append(" ").append(JvmUtils.formatJvmArguments(this.appMasterJvmArgs))
.append(" ").append(appMasterClass.getName())
.append("
--").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)
@@ -822,7 +847,7 @@ public class GobblinYarnAppLauncher {
// Only pass token here and no secrets. (since there is no simple way to
remove single token/ get secrets)
// For RM token, only pass the RM token for the current RM, or the RM will
fail to update the token
Credentials finalCredentials = new Credentials();
- for (Token<? extends TokenIdentifier> token: credentials.getAllTokens()) {
+ for (Token<? extends TokenIdentifier> token : credentials.getAllTokens()) {
if (token.getKind().equals(new Text("RM_DELEGATION_TOKEN")) &&
!token.getService().equals(new Text(this.originalYarnRMAddress))) {
continue;
}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
index 1a5bc96c86..93cc23dba6 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
@@ -49,6 +49,15 @@ public class GobblinYarnConfigurationKeys {
public static final String YARN_RESOURCE_MANAGER_IDS =
YARN_RESOURCE_MANAGER_PREFIX + "ids";
public static final String OTHER_YARN_RESOURCE_MANAGER_ADDRESSES=
"other.yarn.resourcemanager.addresses";
+ public static final String JAR_CACHE_ENABLED = GOBBLIN_YARN_PREFIX +
"jar.cache.enabled";
+
+ public static final boolean JAR_CACHE_ENABLED_DEFAULT = false;
+
+ public static final String JAR_CACHE_DIR = GOBBLIN_YARN_PREFIX +
"jar.cache.dir";
+
+ // Used to store the start time of the app launcher to propagate to workers
and appmaster
+ public static final String YARN_APPLICATION_LAUNCHER_START_TIME_KEY =
GOBBLIN_YARN_PREFIX + "application.start.time";
+
// Gobblin Yarn ApplicationMaster configuration properties.
public static final String APP_MASTER_MEMORY_MBS_KEY = GOBBLIN_YARN_PREFIX +
"app.master.memory.mbs";
public static final String APP_MASTER_CORES_KEY = GOBBLIN_YARN_PREFIX +
"app.master.cores";
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
index 72f9cc3363..4a74da44a5 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
@@ -20,10 +20,14 @@ package org.apache.gobblin.yarn;
import java.io.File;
import java.io.IOException;
import java.net.URL;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -199,6 +203,40 @@ public class YarnHelixUtils {
}
}
+ /**
+ * Calculate the path of a jar cache on HDFS, which is retained on a monthly
basis.
+ * Should be used in conjunction with {@link
#retainKLatestJarCachePaths(Path, int, FileSystem)}. to clean up the cache on a
periodic basis
+ * @param config
+ * @return
+ * @throws IOException
+ */
+ public static Path calculatePerMonthJarCachePath(Config config) throws
IOException {
+ Path jarsCacheDirMonthly = new
Path(config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR));
+ String monthSuffix = new
SimpleDateFormat("yyyy-MM").format(config.getLong(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY));
+ return new Path(jarsCacheDirMonthly, monthSuffix);
+
+ }
+
+ /**
+ * Retain the latest k jar cache paths that are children of the parent cache
path.
+ * @param parentCachePath
+ * @param k the number of latest jar cache paths to retain
+ * @param fs
+ * @return
+ * @throws IllegalAccessException
+ * @throws IOException
+ */
+ public static boolean retainKLatestJarCachePaths(Path parentCachePath, int
k, FileSystem fs) throws IOException {
+ // Cleanup old cache if necessary
+ List<FileStatus> jarDirs =
+ Arrays.stream(fs.exists(parentCachePath) ?
fs.listStatus(parentCachePath) : new
FileStatus[0]).sorted().collect(Collectors.toList());
+ boolean deletesSuccessful = true;
+ for (int i = 0; i < jarDirs.size() - k; i++) {
+ deletesSuccessful &= fs.delete(jarDirs.get(i).getPath(), true);
+ }
+ return deletesSuccessful;
+ }
+
public static void addRemoteFilesToLocalResources(String hdfsFileList,
Map<String, LocalResource> resourceMap, Configuration yarnConfiguration) throws
IOException {
for (String hdfsFilePath : SPLITTER.split(hdfsFileList)) {
Path srcFilePath = new Path(hdfsFilePath);
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java
index 033a24aa90..c271258c1d 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java
@@ -18,14 +18,22 @@ package org.apache.gobblin.yarn;
import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.junit.Assert;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
public class YarnHelixUtilsTest {
/**
@@ -33,6 +41,14 @@ public class YarnHelixUtilsTest {
* added to the resources folder.
* @throws IOException
*/
+ String tempDir = Files.createTempDir().getPath();
+
+ @AfterClass
+ public void tearDown() throws IOException{
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ fs.delete(new Path(this.tempDir), true);
+ }
+
@Test
public void testUpdateToken()
throws IOException {
@@ -51,4 +67,36 @@ public class YarnHelixUtilsTest {
Token<?> readToken = credentials.getToken(new Text("testService"));
Assert.assertNotNull(readToken);
}
+
+ @Test
+ public void testGetJarCachePath() throws IOException {
+ Config config = ConfigFactory.empty()
+
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
ConfigValueFactory.fromAnyRef(1726074000013L))
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR,
ConfigValueFactory.fromAnyRef("/tmp"));
+ Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(config);
+
+ Assert.assertEquals(jarCachePath, new Path("/tmp/2024-09"));
+ }
+
+ @Test
+ public void retainLatestKJarCachePaths() throws IOException {
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ Config config = ConfigFactory.empty()
+
.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY,
ConfigValueFactory.fromAnyRef(1726074000013L))
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR,
ConfigValueFactory.fromAnyRef(this.tempDir + "/tmp"));
+ Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(config);
+ fs.mkdirs(jarCachePath);
+ fs.mkdirs(new Path(jarCachePath.getParent(), "2024-08"));
+ fs.mkdirs(new Path(jarCachePath.getParent(), "2024-07"));
+ fs.mkdirs(new Path(jarCachePath.getParent(), "2024-06"));
+
+ YarnHelixUtils.retainKLatestJarCachePaths(jarCachePath.getParent(), 2, fs);
+
+ Assert.assertTrue(fs.exists(new Path(this.tempDir, "tmp/2024-09")));
+ Assert.assertTrue(fs.exists(new Path(this.tempDir, "tmp/2024-08")));
+ // Should be cleaned up
+ Assert.assertFalse(fs.exists(new Path(this.tempDir, "tmp/2024-07")));
+ Assert.assertFalse(fs.exists(new Path(this.tempDir, "tmp/2024-06")));
+
+ }
}
\ No newline at end of file