This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a634612 [GOBBLIN-1091] Pass Yarn application id as part of AppMaster
and YarnTaskRunner's start up command[]
a634612 is described below
commit a63461257c3fcea8f4ff67087f8cb29be25d6baf
Author: sv2000 <[email protected]>
AuthorDate: Fri Mar 20 09:34:13 2020 -0700
[GOBBLIN-1091] Pass Yarn application id as part of AppMaster and
YarnTaskRunner's start up command[]
Closes #2933 from sv2000/yarnApplicationId
---
.../gobblin/cluster/GobblinClusterConfigurationKeys.java | 1 +
.../org/apache/gobblin/cluster/GobblinTaskRunner.java | 2 ++
.../main/java/org/apache/gobblin/cluster/SingleTask.java | 14 ++++++++++----
.../org/apache/gobblin/yarn/GobblinApplicationMaster.java | 15 +++++++++------
.../org/apache/gobblin/yarn/GobblinYarnAppLauncher.java | 6 ++++--
.../org/apache/gobblin/yarn/GobblinYarnTaskRunner.java | 10 ++++++----
.../main/java/org/apache/gobblin/yarn/YarnService.java | 2 ++
.../apache/gobblin/yarn/GobblinYarnAppLauncherTest.java | 4 ++--
8 files changed, 36 insertions(+), 18 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 7dbf7b1..128a5d6 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -42,6 +42,7 @@ public class GobblinClusterConfigurationKeys {
// General Gobblin Cluster application configuration properties.
public static final String APPLICATION_NAME_OPTION_NAME = "app_name";
+ public static final String APPLICATION_ID_OPTION_NAME = "app_id";
public static final String STANDALONE_CLUSTER_MODE = "standalone_cluster";
public static final String STANDALONE_CLUSTER_MODE_KEY =
GOBBLIN_CLUSTER_PREFIX + "standaloneMode";
public static final boolean DEFAULT_STANDALONE_CLUSTER_MODE = false;
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 6b3fff7..b9f1c96 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -611,6 +611,8 @@ public class GobblinTaskRunner implements
StandardMetricsBridge {
Options options = new Options();
options.addOption("a",
GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME, true,
"Application name");
+ options.addOption("d",
GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME, true,
+ "Application id");
options.addOption("i",
GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME, true,
"Helix instance name");
options.addOption(Option.builder("t").longOpt(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
index 4778371..9dd0a1c 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
@@ -138,10 +138,16 @@ public class SingleTask {
String storeName = _workUnitFilePath.getParent().getName();
WorkUnit workUnit;
- if
(_workUnitFilePath.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION))
{
- workUnit = _stateStores.getMwuStateStore().getAll(storeName,
fileName).get(0);
- } else {
- workUnit = _stateStores.getWuStateStore().getAll(storeName,
fileName).get(0);
+ try {
+ if
(_workUnitFilePath.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION))
{
+ workUnit = _stateStores.getMwuStateStore().getAll(storeName,
fileName).get(0);
+ } else {
+ workUnit = _stateStores.getWuStateStore().getAll(storeName,
fileName).get(0);
+ }
+ } catch (IOException e) {
+ //Add workunitFilePath to the IOException message to aid debugging
+ throw new IOException("Exception retrieving state from state store for
workunit: " + _workUnitFilePath.toString(),
+ e);
}
// The list of individual WorkUnits (flattened) to run
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index 282156d..14488d5 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -80,11 +80,11 @@ public class GobblinApplicationMaster extends
GobblinClusterManager {
@Getter
private final YarnService yarnService;
- public GobblinApplicationMaster(String applicationName, ContainerId
containerId, Config config,
+ public GobblinApplicationMaster(String applicationName, String
applicationId, ContainerId containerId, Config config,
YarnConfiguration yarnConfiguration) throws Exception {
- super(applicationName,
containerId.getApplicationAttemptId().getApplicationId().toString(),
-
GobblinClusterUtils.addDynamicConfig(config.withValue(GobblinYarnConfigurationKeys.CONTAINER_NUM_KEY,
-
ConfigValueFactory.fromAnyRef(YarnHelixUtils.getContainerNum(containerId.toString())))),
+ super(applicationName, applicationId,
GobblinClusterUtils.addDynamicConfig(config
+ .withValue(GobblinYarnConfigurationKeys.CONTAINER_NUM_KEY,
+
ConfigValueFactory.fromAnyRef(YarnHelixUtils.getContainerNum(containerId.toString())))),
Optional.<Path>absent());
String containerLogDir =
config.getString(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY);
@@ -204,6 +204,7 @@ public class GobblinApplicationMaster extends
GobblinClusterManager {
private static Options buildOptions() {
Options options = new Options();
options.addOption("a",
GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME, true, "Yarn
application name");
+ options.addOption("d",
GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME, true, "Yarn
application id");
return options;
}
@@ -216,7 +217,8 @@ public class GobblinApplicationMaster extends
GobblinClusterManager {
Options options = buildOptions();
try {
CommandLine cmd = new DefaultParser().parse(options, args);
- if
(!cmd.hasOption(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)) {
+ if
(!cmd.hasOption(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME) ||
+
(!cmd.hasOption(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME))) {
printUsage(options);
System.exit(1);
}
@@ -231,7 +233,8 @@ public class GobblinApplicationMaster extends
GobblinClusterManager {
ConverterUtils.toContainerId(System.getenv().get(ApplicationConstants.Environment.CONTAINER_ID.key()));
try (GobblinApplicationMaster applicationMaster = new
GobblinApplicationMaster(
-
cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME),
containerId,
+
cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME),
+
cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME),
containerId,
ConfigFactory.load(), new YarnConfiguration())) {
applicationMaster.start();
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 3367842..a6d7a88 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -548,7 +548,7 @@ public class GobblinYarnAppLauncher {
ContainerLaunchContext amContainerLaunchContext =
Records.newRecord(ContainerLaunchContext.class);
amContainerLaunchContext.setLocalResources(appMasterLocalResources);
amContainerLaunchContext.setEnvironment(YarnHelixUtils.getEnvironmentVariables(this.yarnConfiguration));
-
amContainerLaunchContext.setCommands(Lists.newArrayList(buildApplicationMasterCommand(resource.getMemory())));
+
amContainerLaunchContext.setCommands(Lists.newArrayList(buildApplicationMasterCommand(applicationId.toString(),
resource.getMemory())));
Map<ApplicationAccessType, String> acls = new HashMap<>(1);
acls.put(ApplicationAccessType.VIEW_APP, this.appViewAcl);
@@ -729,7 +729,7 @@ public class GobblinYarnAppLauncher {
}
@VisibleForTesting
- protected String buildApplicationMasterCommand(int memoryMbs) {
+ protected String buildApplicationMasterCommand(String applicationId, int
memoryMbs) {
String appMasterClassName = GobblinApplicationMaster.class.getSimpleName();
return new StringBuilder()
.append(ApplicationConstants.Environment.JAVA_HOME.$()).append("/bin/java")
@@ -741,6 +741,8 @@ public class GobblinYarnAppLauncher {
.append(" ").append(GobblinApplicationMaster.class.getName())
.append("
--").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)
.append(" ").append(this.applicationName)
+ .append("
--").append(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME)
+ .append(" ").append(applicationId)
.append("
1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
appMasterClassName).append(".").append(ApplicationConstants.STDOUT)
.append("
2>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index dc68162..d59a429 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -59,9 +59,9 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
public static final String HELIX_YARN_INSTANCE_NAME_PREFIX =
GobblinYarnTaskRunner.class.getSimpleName();
- public GobblinYarnTaskRunner(String applicationName, String
helixInstanceName, ContainerId containerId, Config config,
+ public GobblinYarnTaskRunner(String applicationName, String applicationId,
String helixInstanceName, ContainerId containerId, Config config,
Optional<Path> appWorkDirOptional) throws Exception {
- super(applicationName, helixInstanceName, getApplicationId(containerId),
getTaskRunnerId(containerId),
+ super(applicationName, helixInstanceName, applicationId,
getTaskRunnerId(containerId),
GobblinClusterUtils.addDynamicConfig(config.withValue(GobblinYarnConfigurationKeys.CONTAINER_NUM_KEY,
ConfigValueFactory.fromAnyRef(YarnHelixUtils.getContainerNum(containerId.toString())))),
appWorkDirOptional);
}
@@ -176,7 +176,8 @@ public class GobblinYarnTaskRunner extends
GobblinTaskRunner {
try {
CommandLine cmd = new DefaultParser().parse(options, args);
if
(!cmd.hasOption(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)
|| !cmd
-
.hasOption(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME)) {
+
.hasOption(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME) ||
!cmd
+ .hasOption(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME)) {
printUsage(options);
System.exit(1);
}
@@ -190,6 +191,7 @@ public class GobblinYarnTaskRunner extends
GobblinTaskRunner {
ContainerId containerId =
ConverterUtils.toContainerId(System.getenv().get(ApplicationConstants.Environment.CONTAINER_ID.key()));
String applicationName =
cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME);
+ String applicationId =
cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME);
String helixInstanceName =
cmd.getOptionValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME);
String helixInstanceTags =
cmd.getOptionValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME);
@@ -199,7 +201,7 @@ public class GobblinYarnTaskRunner extends
GobblinTaskRunner {
}
GobblinTaskRunner gobblinTaskRunner =
- new GobblinYarnTaskRunner(applicationName, helixInstanceName,
containerId, config,
+ new GobblinYarnTaskRunner(applicationName, applicationId,
helixInstanceName, containerId, config,
Optional.<Path>absent());
gobblinTaskRunner.start();
} catch (ParseException pe) {
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 027be31..4586571 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -577,6 +577,8 @@ public class YarnService extends AbstractIdleService {
.append(" ").append(GobblinYarnTaskRunner.class.getName())
.append("
--").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)
.append(" ").append(this.applicationName)
+ .append("
--").append(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME)
+ .append(" ").append(this.applicationId)
.append("
--").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME)
.append(" ").append(helixInstanceName);
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
index 0d2f3a5..adc8bd0 100644
---
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
+++
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
@@ -209,7 +209,7 @@ public class GobblinYarnAppLauncherTest implements
HelixMessageTestBase {
@Test
public void testBuildApplicationMasterCommand() {
- String command =
this.gobblinYarnAppLauncher.buildApplicationMasterCommand(64);
+ String command =
this.gobblinYarnAppLauncher.buildApplicationMasterCommand("application_1234_3456",
64);
// 41 is from 64 * 0.8 - 10
Assert.assertTrue(command.contains("-Xmx41"));
@@ -434,7 +434,7 @@ public class GobblinYarnAppLauncherTest implements
HelixMessageTestBase {
public TestApplicationMaster(String applicationName, ContainerId
containerId, Config config,
YarnConfiguration yarnConfiguration)
throws Exception {
- super(applicationName, containerId, config, yarnConfiguration);
+ super(applicationName,
containerId.getApplicationAttemptId().getApplicationId().toString(),
containerId, config, yarnConfiguration);
}
@Override