This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7389a193b [GOBBLIN-1814] Add `MRJobLauncher` configurability for any
failing mapper to be fatal to the MR job (#3675)
7389a193b is described below
commit 7389a193bc3075612339aeff910f7d42b24812fc
Author: Kip Kohn <[email protected]>
AuthorDate: Tue Apr 11 17:54:45 2023 -0700
[GOBBLIN-1814] Add `MRJobLauncher` configurability for any failing mapper
to be fatal to the MR job (#3675)
---
.../java/org/apache/gobblin/configuration/ConfigurationKeys.java | 2 ++
.../java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java | 9 ++++++++-
2 files changed, 10 insertions(+), 1 deletion(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index b3165ca05..e4c051aa1 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -688,10 +688,12 @@ public class ConfigurationKeys {
/** Specifies a static location in HDFS to upload jars to. Useful for
sharing jars across different Gobblin runs.*/
public static final String MR_JARS_DIR = "mr.jars.dir";
public static final String MR_JOB_MAX_MAPPERS_KEY = "mr.job.max.mappers";
+ public static final String MR_JOB_MAPPER_FAILURE_IS_FATAL_KEY =
"mr.job.map.failure.is.fatal";
public static final String MR_TARGET_MAPPER_SIZE = "mr.target.mapper.size";
public static final String MR_REPORT_METRICS_AS_COUNTERS_KEY =
"mr.report.metrics.as.counters";
public static final boolean DEFAULT_MR_REPORT_METRICS_AS_COUNTERS = false;
public static final int DEFAULT_MR_JOB_MAX_MAPPERS = 100;
+ public static final boolean DEFAULT_MR_JOB_MAPPER_FAILURE_IS_FATAL = false;
public static final String DEFAULT_ENABLE_MR_SPECULATIVE_EXECUTION = "false";
/**
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index a35210389..9f7c38360 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -212,7 +212,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
JobConfigurationUtils.putPropertiesIntoConfiguration(this.jobProps,
this.conf);
// Let the job and all mappers finish even if some mappers fail
- this.conf.set("mapreduce.map.failures.maxpercent", "100"); // For Hadoop
2.x
+ this.conf.set("mapreduce.map.failures.maxpercent",
isMapperFailureFatalEnabled(this.jobProps) ? "0" : "100"); // For Hadoop 2.x
// Do not cancel delegation tokens after job has completed (HADOOP-7002)
this.conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens",
false);
@@ -504,6 +504,13 @@ public class MRJobLauncher extends AbstractJobLauncher {
&&
Boolean.parseBoolean(properties.getProperty(ENABLED_CUSTOMIZED_PROGRESS));
}
+ static boolean isMapperFailureFatalEnabled(Properties properties) {
+ return (
+
properties.containsKey(ConfigurationKeys.MR_JOB_MAPPER_FAILURE_IS_FATAL_KEY)
+ &&
Boolean.parseBoolean(properties.getProperty(ConfigurationKeys.MR_JOB_MAPPER_FAILURE_IS_FATAL_KEY)))
+ || ConfigurationKeys.DEFAULT_MR_JOB_MAPPER_FAILURE_IS_FATAL;
+ }
+
@VisibleForTesting
static void serializeJobState(FileSystem fs, Path mrJobDir, Configuration
conf, JobState jobState, Job job)
throws IOException {