This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2563b8e [GOBBLIN-1386] Make gobblin yarn application be able to add
zip fils as local resources, and make yarn class path to be configurable
2563b8e is described below
commit 2563b8ed21b99468efe0013bca7726597538c0fe
Author: Zihan Li <[email protected]>
AuthorDate: Thu Jan 28 11:57:35 2021 -0800
[GOBBLIN-1386] Make gobblin yarn application be able to add zip fils as
local resources, and make yarn class path to be configurable
remove assumption that we only support 2 types in
union
address typos
Make gobblin yarn application be able to add zip
fils as local resources, and make yarn class path
to be configurable
address comments
remove unused import
Closes #3207 from ZihanLi58/GOBBLIN-1368
---
.../gobblin/yarn/GobblinApplicationMaster.java | 1 +
.../gobblin/yarn/GobblinYarnAppLauncher.java | 16 ++++----
.../gobblin/yarn/GobblinYarnConfigurationKeys.java | 3 ++
.../org/apache/gobblin/yarn/YarnHelixUtils.java | 45 +++++++++++++++++++++-
.../java/org/apache/gobblin/yarn/YarnService.java | 17 +++-----
5 files changed, 61 insertions(+), 21 deletions(-)
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index 192f86f..9706314 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -93,6 +93,7 @@ public class GobblinApplicationMaster extends
GobblinClusterManager {
this.applicationLauncher
.addService(logCopier);
}
+ YarnHelixUtils.setYarnClassPath(config, yarnConfiguration);
YarnHelixUtils.setAdditionalYarnClassPath(config, yarnConfiguration);
this.yarnService = buildYarnService(this.config, applicationName,
this.applicationId, yarnConfiguration, this.fs);
this.applicationLauncher.addService(this.yarnService);
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index fe24f0f..01cb964 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -250,6 +250,7 @@ public class GobblinYarnAppLauncher {
InstanceType.SPECTATOR, zkConnectionString);
this.yarnConfiguration = yarnConfiguration;
+ YarnHelixUtils.setYarnClassPath(config, this.yarnConfiguration);
YarnHelixUtils.setAdditionalYarnClassPath(config, this.yarnConfiguration);
this.yarnConfiguration.set("fs.automatic.close", "false");
this.yarnClient = YarnClient.createYarnClient();
@@ -696,8 +697,12 @@ public class GobblinYarnAppLauncher {
Optional.of(appMasterResources), appFilesDestDir, localFs);
}
if
(this.config.hasPath(GobblinYarnConfigurationKeys.APP_MASTER_FILES_REMOTE_KEY))
{
-
addAppRemoteFiles(this.config.getString(GobblinYarnConfigurationKeys.APP_MASTER_FILES_REMOTE_KEY),
- appMasterResources);
+
YarnHelixUtils.addRemoteFilesToLocalResources(this.config.getString(GobblinYarnConfigurationKeys.APP_MASTER_FILES_REMOTE_KEY),
+ appMasterResources, yarnConfiguration);
+ }
+ if
(this.config.hasPath(GobblinYarnConfigurationKeys.APP_MASTER_ZIPS_REMOTE_KEY)) {
+
YarnHelixUtils.addRemoteZipsToLocalResources(this.config.getString(GobblinYarnConfigurationKeys.APP_MASTER_ZIPS_REMOTE_KEY),
+ appMasterResources, yarnConfiguration);
}
if
(this.config.hasPath(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY)) {
Path appFilesDestDir = new Path(appMasterWorkDir,
GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME);
@@ -783,13 +788,6 @@ public class GobblinYarnAppLauncher {
}
}
- private void addAppRemoteFiles(String hdfsFileList, Map<String,
LocalResource> resourceMap)
- throws IOException {
- for (String hdfsFilePath : SPLITTER.split(hdfsFileList)) {
- YarnHelixUtils.addFileAsLocalResource(this.fs, new Path(hdfsFilePath),
LocalResourceType.FILE, resourceMap);
- }
- }
-
private void addJobConfPackage(String jobConfPackagePath, Path destDir,
Map<String, LocalResource> resourceMap)
throws IOException {
Path srcFilePath = new Path(jobConfPackagePath);
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
index 1a4bbc3..956848e 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
@@ -44,6 +44,7 @@ public class GobblinYarnConfigurationKeys {
public static final String APP_MASTER_JARS_KEY = GOBBLIN_YARN_PREFIX +
"app.master.jars";
public static final String APP_MASTER_FILES_LOCAL_KEY = GOBBLIN_YARN_PREFIX
+ "app.master.files.local";
public static final String APP_MASTER_FILES_REMOTE_KEY = GOBBLIN_YARN_PREFIX
+ "app.master.files.remote";
+ public static final String APP_MASTER_ZIPS_REMOTE_KEY = GOBBLIN_YARN_PREFIX
+ "app.master.zips.remote";
public static final String APP_MASTER_WORK_DIR_NAME = "appmaster";
public static final String APP_MASTER_JVM_ARGS_KEY = GOBBLIN_YARN_PREFIX +
"app.master.jvm.args";
public static final String APP_MASTER_SERVICE_CLASSES = GOBBLIN_YARN_PREFIX
+ "app.master.serviceClasses";
@@ -64,6 +65,7 @@ public class GobblinYarnConfigurationKeys {
public static final String CONTAINER_JARS_KEY = GOBBLIN_YARN_PREFIX +
"container.jars";
public static final String CONTAINER_FILES_LOCAL_KEY = GOBBLIN_YARN_PREFIX +
"container.files.local";
public static final String CONTAINER_FILES_REMOTE_KEY = GOBBLIN_YARN_PREFIX
+ "container.files.remote";
+ public static final String CONTAINER_ZIPS_REMOTE_KEY = GOBBLIN_YARN_PREFIX +
"container.zips.remote";
public static final String CONTAINER_WORK_DIR_NAME = "container";
public static final String CONTAINER_JVM_ARGS_KEY = GOBBLIN_YARN_PREFIX +
"container.jvm.args";
public static final String CONTAINER_HOST_AFFINITY_ENABLED =
GOBBLIN_YARN_PREFIX + "container.affinity.enabled";
@@ -124,4 +126,5 @@ public class GobblinYarnConfigurationKeys {
public static final String GOBBLIN_YARN_AZKABAN_CLASS_LOG_LEVELS =
GOBBLIN_YARN_PREFIX + "azkaban.class.logLevels";
//Container classpaths properties
public static final String GOBBLIN_YARN_ADDITIONAL_CLASSPATHS =
GOBBLIN_YARN_PREFIX + "additional.classpaths";
+ public static final String GOBBLIN_YARN_CLASSPATHS = GOBBLIN_YARN_PREFIX +
"classpaths";
}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
index 4ecc8b9..f9cda96 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
@@ -17,12 +17,15 @@
package org.apache.gobblin.yarn;
+import com.google.common.base.Splitter;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.Collection;
+import java.util.Iterator;
import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -57,6 +60,10 @@ import org.apache.gobblin.util.ConfigUtils;
public class YarnHelixUtils {
private static final Logger LOGGER =
LoggerFactory.getLogger(YarnHelixUtils.class);
+ private static final Splitter SPLITTER =
Splitter.on(',').omitEmptyStrings().trimResults();
+
+ private static final Splitter ZIP_SPLITTER =
Splitter.on('#').omitEmptyStrings().trimResults();
+
/**
* Write a {@link Token} to a given file.
*
@@ -121,6 +128,12 @@ public class YarnHelixUtils {
*/
public static void addFileAsLocalResource(FileSystem fs, Path destFilePath,
LocalResourceType resourceType,
Map<String, LocalResource> resourceMap) throws IOException {
+ addFileAsLocalResource(fs, destFilePath, resourceType, resourceMap,
destFilePath.getName());
+ }
+
+
+ public static void addFileAsLocalResource(FileSystem fs, Path destFilePath,
LocalResourceType resourceType,
+ Map<String, LocalResource> resourceMap, String resourceName) throws
IOException {
LocalResource fileResource = Records.newRecord(LocalResource.class);
FileStatus fileStatus = fs.getFileStatus(destFilePath);
fileResource.setResource(ConverterUtils.getYarnUrlFromPath(destFilePath));
@@ -128,7 +141,7 @@ public class YarnHelixUtils {
fileResource.setTimestamp(fileStatus.getModificationTime());
fileResource.setType(resourceType);
fileResource.setVisibility(LocalResourceVisibility.APPLICATION);
- resourceMap.put(destFilePath.getName(), fileResource);
+ resourceMap.put(resourceName, fileResource);
}
/**
@@ -178,6 +191,36 @@ public class YarnHelixUtils {
}
}
+ public static void setYarnClassPath(Config config, Configuration
yarnConfiguration) {
+ if (!ConfigUtils.emptyIfNotPresent(config,
GobblinYarnConfigurationKeys.GOBBLIN_YARN_CLASSPATHS).equals(
+ StringUtils.EMPTY)){
+
yarnConfiguration.setStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
config.getString(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CLASSPATHS));
+ }
+ }
+
+ public static void addRemoteFilesToLocalResources(String hdfsFileList,
Map<String, LocalResource> resourceMap, Configuration yarnConfiguration) throws
IOException {
+ for (String hdfsFilePath : SPLITTER.split(hdfsFileList)) {
+ Path srcFilePath = new Path(hdfsFilePath);
+ YarnHelixUtils.addFileAsLocalResource(
+ srcFilePath.getFileSystem(yarnConfiguration), srcFilePath,
LocalResourceType.FILE, resourceMap);
+ }
+ }
+
+ public static void addRemoteZipsToLocalResources(String hdfsFileList,
Map<String, LocalResource> resourceMap, Configuration yarnConfiguration)
+ throws IOException {
+ for (String zipFileWithName : SPLITTER.split(hdfsFileList)) {
+ Iterator<String> zipFileAndName =
ZIP_SPLITTER.split(zipFileWithName).iterator();
+ Path srcFilePath = new Path(zipFileAndName.next());
+ try {
+
YarnHelixUtils.addFileAsLocalResource(srcFilePath.getFileSystem(yarnConfiguration),
srcFilePath, LocalResourceType.ARCHIVE,
+ resourceMap, zipFileAndName.next());
+ } catch (Exception e) {
+ throw new IOException(String.format("Fail to extract %s as local
resources, maybe a wrong pattern, "
+ + "correct pattern should be {zipPath}#{targetUnzippedName}",
zipFileAndName), e);
+ }
+ }
+ }
+
/**
* Return the identifier of the containerId. The identifier is the substring
in the containerId representing
* the sequential number of the container.
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 27bfe9e..3c34eb6 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -123,8 +123,6 @@ public class YarnService extends AbstractIdleService {
private static final Logger LOGGER =
LoggerFactory.getLogger(YarnService.class);
- private static final Splitter SPLITTER =
Splitter.on(',').omitEmptyStrings().trimResults();
-
private static final String UNKNOWN_HELIX_INSTANCE = "UNKNOWN";
private final String applicationName;
@@ -514,9 +512,13 @@ public class YarnService extends AbstractIdleService {
new Path(containerWorkDir,
GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME), resourceMap);
if
(this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_FILES_REMOTE_KEY)) {
-
addRemoteAppFiles(this.config.getString(GobblinYarnConfigurationKeys.CONTAINER_FILES_REMOTE_KEY),
resourceMap);
+
YarnHelixUtils.addRemoteFilesToLocalResources(this.config.getString(GobblinYarnConfigurationKeys.CONTAINER_FILES_REMOTE_KEY),
+ resourceMap, yarnConfiguration);
+ }
+ if
(this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_ZIPS_REMOTE_KEY)) {
+
YarnHelixUtils.addRemoteZipsToLocalResources(this.config.getString(GobblinYarnConfigurationKeys.CONTAINER_ZIPS_REMOTE_KEY),
+ resourceMap, yarnConfiguration);
}
-
ContainerLaunchContext containerLaunchContext =
Records.newRecord(ContainerLaunchContext.class);
containerLaunchContext.setLocalResources(resourceMap);
containerLaunchContext.setEnvironment(YarnHelixUtils.getEnvironmentVariables(this.yarnConfiguration));
@@ -547,13 +549,6 @@ public class YarnService extends AbstractIdleService {
}
}
- private void addRemoteAppFiles(String hdfsFileList, Map<String,
LocalResource> resourceMap) throws IOException {
- for (String hdfsFilePath : SPLITTER.split(hdfsFileList)) {
- Path srcFilePath = new Path(hdfsFilePath);
- YarnHelixUtils.addFileAsLocalResource(
- srcFilePath.getFileSystem(this.yarnConfiguration), srcFilePath,
LocalResourceType.FILE, resourceMap);
- }
- }
private ByteBuffer getSecurityTokens() throws IOException {
Credentials credentials =
UserGroupInformation.getCurrentUser().getCredentials();