This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 93049c9 [GOBBLIN-1217] start metrics reporting with a few map-reduce
properties
93049c9 is described below
commit 93049c9dabbd35d219824d7122f98db02398c722
Author: Arjun <[email protected]>
AuthorDate: Tue Jul 21 16:20:03 2020 -0700
[GOBBLIN-1217] start metrics reporting with a few map-reduce properties
Closes #3065 from arjun4084346/mapperNum
---
.../gobblin/runtime/mapreduce/MRJobLauncher.java | 44 ++++++++++++++++------
.../org/apache/gobblin/yarn/YarnHelixUtils.java | 5 ++-
2 files changed, 36 insertions(+), 13 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index ed02d38..1b89ba3 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
@@ -106,7 +108,6 @@ import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.util.SerializationUtils;
import org.apache.gobblin.util.reflection.RestrictedFieldAccessingUtils;
-
/**
* An implementation of {@link JobLauncher} that launches a Gobblin job as a
Hadoop MR job.
*
@@ -147,6 +148,12 @@ public class MRJobLauncher extends AbstractJobLauncher {
private static final int MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT = 5;
private static final int WAITING_TIME_ON_IMCOMPLETE_UPLOAD = 3000;
+ public static final String MR_TYPE_KEY =
ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX + "mr.type";
+ public static final String MAPPER_TASK_NUM_KEY =
ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX + "reporting.mapper.task.num";
+ public static final String MAPPER_TASK_ATTEMPT_NUM_KEY =
ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX +
"reporting.mapper.task.attempt.num";
+ public static final String REDUCER_TASK_NUM_KEY =
ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX + "reporting.reducer.task.num";
+ public static final String REDUCER_TASK_ATTEMPT_NUM_KEY =
ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX +
"reporting.reducer.task.attempt.num";
+
private static final Splitter SPLITTER =
Splitter.on(',').omitEmptyStrings().trimResults();
private final Configuration conf;
@@ -725,6 +732,8 @@ public class MRJobLauncher extends AbstractJobLauncher {
@Override
protected void setup(Context context) {
final State gobblinJobState =
HadoopUtils.getStateFromConf(context.getConfiguration());
+ TaskAttemptID taskAttemptID = context.getTaskAttemptID();
+
try (Closer closer = Closer.create()) {
// Default for customizedProgressEnabled is false.
this.customizedProgressEnabled =
isCustomizedProgressReportEnabled(gobblinJobState.getProperties());
@@ -771,6 +780,23 @@ public class MRJobLauncher extends AbstractJobLauncher {
gobblinJobState.setProp(entry.getKey(),
entry.getValue().unwrapped().toString());
}
+ // add some more MR task related configs
+
+ String[] tokens = taskAttemptID.toString().split("_");
+ TaskType taskType = taskAttemptID.getTaskType();
+ gobblinJobState.setProp(MR_TYPE_KEY, taskType.name());
+
+ // a task attempt id should be like
'attempt_1592863931636_2371636_m_000003_4'
+ if (tokens.length == 6) {
+ if (taskType.equals(TaskType.MAP)) {
+ gobblinJobState.setProp(MAPPER_TASK_NUM_KEY, tokens[tokens.length -
2]);
+ gobblinJobState.setProp(MAPPER_TASK_ATTEMPT_NUM_KEY,
tokens[tokens.length - 1]);
+ } else if (taskType.equals(TaskType.REDUCE)) {
+ gobblinJobState.setProp(REDUCER_TASK_NUM_KEY, tokens[tokens.length -
2]);
+ gobblinJobState.setProp(REDUCER_TASK_ATTEMPT_NUM_KEY,
tokens[tokens.length - 1]);
+ }
+ }
+
this.taskExecutor = new TaskExecutor(configuration);
this.taskStateTracker = new MRTaskStateTracker(context);
this.serviceManager = new
ServiceManager(Lists.newArrayList(this.taskExecutor, this.taskStateTracker));
@@ -782,20 +808,16 @@ public class MRJobLauncher extends AbstractJobLauncher {
}
// Setup and start metrics reporting if metric reporting is enabled
- if (Boolean.valueOf(
- configuration.get(ConfigurationKeys.METRICS_ENABLED_KEY,
ConfigurationKeys.DEFAULT_METRICS_ENABLED))) {
+ if
(Boolean.parseBoolean(configuration.get(ConfigurationKeys.METRICS_ENABLED_KEY,
ConfigurationKeys.DEFAULT_METRICS_ENABLED))) {
this.jobMetrics = Optional.of(JobMetrics.get(this.jobState));
try {
- this.jobMetrics.get()
- .startMetricReportingWithFileSuffix(gobblinJobState,
context.getTaskAttemptID().toString());
+
this.jobMetrics.get().startMetricReportingWithFileSuffix(gobblinJobState,
taskAttemptID.toString());
} catch (MultiReporterException ex) {
//Fail the task if metric/event reporting failure is configured to
be fatal.
- boolean isMetricReportingFailureFatal = Boolean.valueOf(configuration
-
.get(ConfigurationKeys.GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL,
-
Boolean.toString(ConfigurationKeys.DEFAULT_GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL)));
- boolean isEventReportingFailureFatal = Boolean.valueOf(configuration
-
.get(ConfigurationKeys.GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL,
-
Boolean.toString(ConfigurationKeys.DEFAULT_GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL)));
+ boolean isMetricReportingFailureFatal =
configuration.getBoolean(ConfigurationKeys.GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL,
+
ConfigurationKeys.DEFAULT_GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL);
+ boolean isEventReportingFailureFatal =
configuration.getBoolean(ConfigurationKeys.GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL,
+
ConfigurationKeys.DEFAULT_GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL);
if (MetricReportUtils.shouldThrowException(LOG, ex,
isMetricReportingFailureFatal, isEventReportingFailureFatal)) {
throw new RuntimeException(ex);
}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
index 177e177..fb29049 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
@@ -17,14 +17,12 @@
package org.apache.gobblin.yarn;
-import com.typesafe.config.Config;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
-import org.apache.gobblin.util.ConfigUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -42,6 +40,9 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
/**