This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a634612  [GOBBLIN-1091] Pass Yarn application id as part of AppMaster 
and YarnTaskRunner's start up command[]
a634612 is described below

commit a63461257c3fcea8f4ff67087f8cb29be25d6baf
Author: sv2000 <[email protected]>
AuthorDate: Fri Mar 20 09:34:13 2020 -0700

    [GOBBLIN-1091] Pass Yarn application id as part of AppMaster and 
YarnTaskRunner's start up command[]
    
    Closes #2933 from sv2000/yarnApplicationId
---
 .../gobblin/cluster/GobblinClusterConfigurationKeys.java  |  1 +
 .../org/apache/gobblin/cluster/GobblinTaskRunner.java     |  2 ++
 .../main/java/org/apache/gobblin/cluster/SingleTask.java  | 14 ++++++++++----
 .../org/apache/gobblin/yarn/GobblinApplicationMaster.java | 15 +++++++++------
 .../org/apache/gobblin/yarn/GobblinYarnAppLauncher.java   |  6 ++++--
 .../org/apache/gobblin/yarn/GobblinYarnTaskRunner.java    | 10 ++++++----
 .../main/java/org/apache/gobblin/yarn/YarnService.java    |  2 ++
 .../apache/gobblin/yarn/GobblinYarnAppLauncherTest.java   |  4 ++--
 8 files changed, 36 insertions(+), 18 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 7dbf7b1..128a5d6 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -42,6 +42,7 @@ public class GobblinClusterConfigurationKeys {
 
   // General Gobblin Cluster application configuration properties.
   public static final String APPLICATION_NAME_OPTION_NAME = "app_name";
+  public static final String APPLICATION_ID_OPTION_NAME = "app_id";
   public static final String STANDALONE_CLUSTER_MODE = "standalone_cluster";
   public static final String STANDALONE_CLUSTER_MODE_KEY = 
GOBBLIN_CLUSTER_PREFIX + "standaloneMode";
   public static final boolean DEFAULT_STANDALONE_CLUSTER_MODE = false;
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 6b3fff7..b9f1c96 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -611,6 +611,8 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
     Options options = new Options();
     options.addOption("a", 
GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME, true,
         "Application name");
+    options.addOption("d", 
GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME, true,
+        "Application id");
     options.addOption("i", 
GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME, true,
         "Helix instance name");
     
options.addOption(Option.builder("t").longOpt(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME)
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
index 4778371..9dd0a1c 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
@@ -138,10 +138,16 @@ public class SingleTask {
     String storeName = _workUnitFilePath.getParent().getName();
     WorkUnit workUnit;
 
-    if 
(_workUnitFilePath.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION))
 {
-      workUnit = _stateStores.getMwuStateStore().getAll(storeName, 
fileName).get(0);
-    } else {
-      workUnit = _stateStores.getWuStateStore().getAll(storeName, 
fileName).get(0);
+    try {
+      if 
(_workUnitFilePath.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION))
 {
+        workUnit = _stateStores.getMwuStateStore().getAll(storeName, 
fileName).get(0);
+      } else {
+        workUnit = _stateStores.getWuStateStore().getAll(storeName, 
fileName).get(0);
+      }
+    } catch (IOException e) {
+      //Add workunitFilePath to the IOException message to aid debugging
+      throw new IOException("Exception retrieving state from state store for 
workunit: " + _workUnitFilePath.toString(),
+          e);
     }
 
     // The list of individual WorkUnits (flattened) to run
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index 282156d..14488d5 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -80,11 +80,11 @@ public class GobblinApplicationMaster extends 
GobblinClusterManager {
   @Getter
   private final YarnService yarnService;
 
-  public GobblinApplicationMaster(String applicationName, ContainerId 
containerId, Config config,
+  public GobblinApplicationMaster(String applicationName, String 
applicationId, ContainerId containerId, Config config,
       YarnConfiguration yarnConfiguration) throws Exception {
-    super(applicationName, 
containerId.getApplicationAttemptId().getApplicationId().toString(),
-        
GobblinClusterUtils.addDynamicConfig(config.withValue(GobblinYarnConfigurationKeys.CONTAINER_NUM_KEY,
-            
ConfigValueFactory.fromAnyRef(YarnHelixUtils.getContainerNum(containerId.toString())))),
+    super(applicationName, applicationId, 
GobblinClusterUtils.addDynamicConfig(config
+            .withValue(GobblinYarnConfigurationKeys.CONTAINER_NUM_KEY,
+                
ConfigValueFactory.fromAnyRef(YarnHelixUtils.getContainerNum(containerId.toString())))),
         Optional.<Path>absent());
 
     String containerLogDir = 
config.getString(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY);
@@ -204,6 +204,7 @@ public class GobblinApplicationMaster extends 
GobblinClusterManager {
   private static Options buildOptions() {
     Options options = new Options();
     options.addOption("a", 
GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME, true, "Yarn 
application name");
+    options.addOption("d", 
GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME, true, "Yarn 
application id");
     return options;
   }
 
@@ -216,7 +217,8 @@ public class GobblinApplicationMaster extends 
GobblinClusterManager {
     Options options = buildOptions();
     try {
       CommandLine cmd = new DefaultParser().parse(options, args);
-      if 
(!cmd.hasOption(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)) {
+      if 
(!cmd.hasOption(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME) ||
+          
(!cmd.hasOption(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME))) {
         printUsage(options);
         System.exit(1);
       }
@@ -231,7 +233,8 @@ public class GobblinApplicationMaster extends 
GobblinClusterManager {
           
ConverterUtils.toContainerId(System.getenv().get(ApplicationConstants.Environment.CONTAINER_ID.key()));
 
       try (GobblinApplicationMaster applicationMaster = new 
GobblinApplicationMaster(
-          
cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME),
 containerId,
+          
cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME),
+          
cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME), 
containerId,
           ConfigFactory.load(), new YarnConfiguration())) {
 
         applicationMaster.start();
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 3367842..a6d7a88 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -548,7 +548,7 @@ public class GobblinYarnAppLauncher {
     ContainerLaunchContext amContainerLaunchContext = 
Records.newRecord(ContainerLaunchContext.class);
     amContainerLaunchContext.setLocalResources(appMasterLocalResources);
     
amContainerLaunchContext.setEnvironment(YarnHelixUtils.getEnvironmentVariables(this.yarnConfiguration));
-    
amContainerLaunchContext.setCommands(Lists.newArrayList(buildApplicationMasterCommand(resource.getMemory())));
+    
amContainerLaunchContext.setCommands(Lists.newArrayList(buildApplicationMasterCommand(applicationId.toString(),
 resource.getMemory())));
 
     Map<ApplicationAccessType, String> acls = new HashMap<>(1);
     acls.put(ApplicationAccessType.VIEW_APP, this.appViewAcl);
@@ -729,7 +729,7 @@ public class GobblinYarnAppLauncher {
   }
 
   @VisibleForTesting
-  protected String buildApplicationMasterCommand(int memoryMbs) {
+  protected String buildApplicationMasterCommand(String applicationId, int 
memoryMbs) {
     String appMasterClassName = GobblinApplicationMaster.class.getSimpleName();
     return new StringBuilder()
         
.append(ApplicationConstants.Environment.JAVA_HOME.$()).append("/bin/java")
@@ -741,6 +741,8 @@ public class GobblinYarnAppLauncher {
         .append(" ").append(GobblinApplicationMaster.class.getName())
         .append(" 
--").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)
         .append(" ").append(this.applicationName)
+        .append(" 
--").append(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME)
+        .append(" ").append(applicationId)
         .append(" 
1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
             appMasterClassName).append(".").append(ApplicationConstants.STDOUT)
         .append(" 
2>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index dc68162..d59a429 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -59,9 +59,9 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
 
   public static final String HELIX_YARN_INSTANCE_NAME_PREFIX = 
GobblinYarnTaskRunner.class.getSimpleName();
 
-  public GobblinYarnTaskRunner(String applicationName, String 
helixInstanceName, ContainerId containerId, Config config,
+  public GobblinYarnTaskRunner(String applicationName, String applicationId, 
String helixInstanceName, ContainerId containerId, Config config,
       Optional<Path> appWorkDirOptional) throws Exception {
-    super(applicationName, helixInstanceName, getApplicationId(containerId), 
getTaskRunnerId(containerId),
+    super(applicationName, helixInstanceName, applicationId, 
getTaskRunnerId(containerId),
         
GobblinClusterUtils.addDynamicConfig(config.withValue(GobblinYarnConfigurationKeys.CONTAINER_NUM_KEY,
             
ConfigValueFactory.fromAnyRef(YarnHelixUtils.getContainerNum(containerId.toString())))),
 appWorkDirOptional);
   }
@@ -176,7 +176,8 @@ public class GobblinYarnTaskRunner extends 
GobblinTaskRunner {
     try {
       CommandLine cmd = new DefaultParser().parse(options, args);
       if 
(!cmd.hasOption(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME) 
|| !cmd
-          
.hasOption(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME)) {
+          
.hasOption(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME) || 
!cmd
+    .hasOption(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME)) {
         printUsage(options);
         System.exit(1);
       }
@@ -190,6 +191,7 @@ public class GobblinYarnTaskRunner extends 
GobblinTaskRunner {
       ContainerId containerId =
           
ConverterUtils.toContainerId(System.getenv().get(ApplicationConstants.Environment.CONTAINER_ID.key()));
       String applicationName = 
cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME);
+      String applicationId = 
cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME);
       String helixInstanceName = 
cmd.getOptionValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME);
       String helixInstanceTags = 
cmd.getOptionValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME);
 
@@ -199,7 +201,7 @@ public class GobblinYarnTaskRunner extends 
GobblinTaskRunner {
       }
 
       GobblinTaskRunner gobblinTaskRunner =
-          new GobblinYarnTaskRunner(applicationName, helixInstanceName, 
containerId, config,
+          new GobblinYarnTaskRunner(applicationName, applicationId, 
helixInstanceName, containerId, config,
               Optional.<Path>absent());
       gobblinTaskRunner.start();
     } catch (ParseException pe) {
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 027be31..4586571 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -577,6 +577,8 @@ public class YarnService extends AbstractIdleService {
         .append(" ").append(GobblinYarnTaskRunner.class.getName())
         .append(" 
--").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)
         .append(" ").append(this.applicationName)
+        .append(" 
--").append(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME)
+        .append(" ").append(this.applicationId)
         .append(" 
--").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME)
         .append(" ").append(helixInstanceName);
 
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
index 0d2f3a5..adc8bd0 100644
--- 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
+++ 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
@@ -209,7 +209,7 @@ public class GobblinYarnAppLauncherTest implements 
HelixMessageTestBase {
 
   @Test
   public void testBuildApplicationMasterCommand() {
-    String command = 
this.gobblinYarnAppLauncher.buildApplicationMasterCommand(64);
+    String command = 
this.gobblinYarnAppLauncher.buildApplicationMasterCommand("application_1234_3456",
 64);
 
     // 41 is from 64 * 0.8 - 10
     Assert.assertTrue(command.contains("-Xmx41"));
@@ -434,7 +434,7 @@ public class GobblinYarnAppLauncherTest implements 
HelixMessageTestBase {
     public TestApplicationMaster(String applicationName, ContainerId 
containerId, Config config,
         YarnConfiguration yarnConfiguration)
         throws Exception {
-      super(applicationName, containerId, config, yarnConfiguration);
+      super(applicationName, 
containerId.getApplicationAttemptId().getApplicationId().toString(), 
containerId, config, yarnConfiguration);
     }
 
     @Override

Reply via email to