This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 93049c9  [GOBBLIN-1217] start metrics reporting with a few map-reduce 
properties
93049c9 is described below

commit 93049c9dabbd35d219824d7122f98db02398c722
Author: Arjun <[email protected]>
AuthorDate: Tue Jul 21 16:20:03 2020 -0700

    [GOBBLIN-1217] start metrics reporting with a few map-reduce properties
    
    Closes #3065 from arjun4084346/mapperNum
---
 .../gobblin/runtime/mapreduce/MRJobLauncher.java   | 44 ++++++++++++++++------
 .../org/apache/gobblin/yarn/YarnHelixUtils.java    |  5 ++-
 2 files changed, 36 insertions(+), 13 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index ed02d38..1b89ba3 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
@@ -106,7 +108,6 @@ import org.apache.gobblin.util.JobLauncherUtils;
 import org.apache.gobblin.util.ParallelRunner;
 import org.apache.gobblin.util.SerializationUtils;
 import org.apache.gobblin.util.reflection.RestrictedFieldAccessingUtils;
-
 /**
  * An implementation of {@link JobLauncher} that launches a Gobblin job as a 
Hadoop MR job.
  *
@@ -147,6 +148,12 @@ public class MRJobLauncher extends AbstractJobLauncher {
   private static final int MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT = 5;
   private static final int WAITING_TIME_ON_IMCOMPLETE_UPLOAD = 3000;
 
+  public static final String MR_TYPE_KEY = 
ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX + "mr.type";
+  public static final String MAPPER_TASK_NUM_KEY = 
ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX + "reporting.mapper.task.num";
+  public static final String MAPPER_TASK_ATTEMPT_NUM_KEY = 
ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX + 
"reporting.mapper.task.attempt.num";
+  public static final String REDUCER_TASK_NUM_KEY = 
ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX + "reporting.reducer.task.num";
+  public static final String REDUCER_TASK_ATTEMPT_NUM_KEY = 
ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX + 
"reporting.reducer.task.attempt.num";
+
   private static final Splitter SPLITTER = 
Splitter.on(',').omitEmptyStrings().trimResults();
 
   private final Configuration conf;
@@ -725,6 +732,8 @@ public class MRJobLauncher extends AbstractJobLauncher {
     @Override
     protected void setup(Context context) {
       final State gobblinJobState = 
HadoopUtils.getStateFromConf(context.getConfiguration());
+      TaskAttemptID taskAttemptID = context.getTaskAttemptID();
+
       try (Closer closer = Closer.create()) {
         // Default for customizedProgressEnabled is false.
         this.customizedProgressEnabled = 
isCustomizedProgressReportEnabled(gobblinJobState.getProperties());
@@ -771,6 +780,23 @@ public class MRJobLauncher extends AbstractJobLauncher {
         gobblinJobState.setProp(entry.getKey(), 
entry.getValue().unwrapped().toString());
       }
 
+      // add some more MR task related configs
+
+      String[] tokens = taskAttemptID.toString().split("_");
+      TaskType taskType = taskAttemptID.getTaskType();
+      gobblinJobState.setProp(MR_TYPE_KEY, taskType.name());
+
+      // a task attempt id should be like 
'attempt_1592863931636_2371636_m_000003_4'
+      if (tokens.length == 6) {
+        if (taskType.equals(TaskType.MAP)) {
+          gobblinJobState.setProp(MAPPER_TASK_NUM_KEY, tokens[tokens.length - 
2]);
+          gobblinJobState.setProp(MAPPER_TASK_ATTEMPT_NUM_KEY, 
tokens[tokens.length - 1]);
+        } else if (taskType.equals(TaskType.REDUCE)) {
+          gobblinJobState.setProp(REDUCER_TASK_NUM_KEY, tokens[tokens.length - 
2]);
+          gobblinJobState.setProp(REDUCER_TASK_ATTEMPT_NUM_KEY, 
tokens[tokens.length - 1]);
+        }
+      }
+
       this.taskExecutor = new TaskExecutor(configuration);
       this.taskStateTracker = new MRTaskStateTracker(context);
       this.serviceManager = new 
ServiceManager(Lists.newArrayList(this.taskExecutor, this.taskStateTracker));
@@ -782,20 +808,16 @@ public class MRJobLauncher extends AbstractJobLauncher {
       }
 
       // Setup and start metrics reporting if metric reporting is enabled
-      if (Boolean.valueOf(
-          configuration.get(ConfigurationKeys.METRICS_ENABLED_KEY, 
ConfigurationKeys.DEFAULT_METRICS_ENABLED))) {
+      if 
(Boolean.parseBoolean(configuration.get(ConfigurationKeys.METRICS_ENABLED_KEY, 
ConfigurationKeys.DEFAULT_METRICS_ENABLED))) {
         this.jobMetrics = Optional.of(JobMetrics.get(this.jobState));
         try {
-          this.jobMetrics.get()
-              .startMetricReportingWithFileSuffix(gobblinJobState, 
context.getTaskAttemptID().toString());
+          
this.jobMetrics.get().startMetricReportingWithFileSuffix(gobblinJobState, 
taskAttemptID.toString());
         } catch (MultiReporterException ex) {
           //Fail the task if metric/event reporting failure is configured to 
be fatal.
-          boolean isMetricReportingFailureFatal = Boolean.valueOf(configuration
-              
.get(ConfigurationKeys.GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL,
-                  
Boolean.toString(ConfigurationKeys.DEFAULT_GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL)));
-          boolean isEventReportingFailureFatal = Boolean.valueOf(configuration
-              
.get(ConfigurationKeys.GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL,
-                  
Boolean.toString(ConfigurationKeys.DEFAULT_GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL)));
+          boolean isMetricReportingFailureFatal = 
configuration.getBoolean(ConfigurationKeys.GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL,
+              
ConfigurationKeys.DEFAULT_GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL);
+          boolean isEventReportingFailureFatal = 
configuration.getBoolean(ConfigurationKeys.GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL,
+                  
ConfigurationKeys.DEFAULT_GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL);
           if (MetricReportUtils.shouldThrowException(LOG, ex, 
isMetricReportingFailureFatal, isEventReportingFailureFatal)) {
             throw new RuntimeException(ex);
           }
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
index 177e177..fb29049 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
@@ -17,14 +17,12 @@
 
 package org.apache.gobblin.yarn;
 
-import com.typesafe.config.Config;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.gobblin.util.ConfigUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -42,6 +40,9 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 
 import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
 
 
 /**

Reply via email to