This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7389a193b [GOBBLIN-1814] Add `MRJobLauncher` configurability for any 
failing mapper to be fatal to the MR job (#3675)
7389a193b is described below

commit 7389a193bc3075612339aeff910f7d42b24812fc
Author: Kip Kohn <[email protected]>
AuthorDate: Tue Apr 11 17:54:45 2023 -0700

    [GOBBLIN-1814] Add `MRJobLauncher` configurability for any failing mapper 
to be fatal to the MR job (#3675)
---
 .../java/org/apache/gobblin/configuration/ConfigurationKeys.java | 2 ++
 .../java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java | 9 ++++++++-
 2 files changed, 10 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index b3165ca05..e4c051aa1 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -688,10 +688,12 @@ public class ConfigurationKeys {
   /** Specifies a static location in HDFS to upload jars to. Useful for 
sharing jars across different Gobblin runs.*/
   public static final String MR_JARS_DIR = "mr.jars.dir";
   public static final String MR_JOB_MAX_MAPPERS_KEY = "mr.job.max.mappers";
+  public static final String MR_JOB_MAPPER_FAILURE_IS_FATAL_KEY = 
"mr.job.map.failure.is.fatal";
   public static final String MR_TARGET_MAPPER_SIZE = "mr.target.mapper.size";
   public static final String MR_REPORT_METRICS_AS_COUNTERS_KEY = 
"mr.report.metrics.as.counters";
   public static final boolean DEFAULT_MR_REPORT_METRICS_AS_COUNTERS = false;
   public static final int DEFAULT_MR_JOB_MAX_MAPPERS = 100;
+  public static final boolean DEFAULT_MR_JOB_MAPPER_FAILURE_IS_FATAL = false;
   public static final String DEFAULT_ENABLE_MR_SPECULATIVE_EXECUTION = "false";
 
   /**
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index a35210389..9f7c38360 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -212,7 +212,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
     JobConfigurationUtils.putPropertiesIntoConfiguration(this.jobProps, 
this.conf);
 
     // Let the job and all mappers finish even if some mappers fail
-    this.conf.set("mapreduce.map.failures.maxpercent", "100"); // For Hadoop 
2.x
+    this.conf.set("mapreduce.map.failures.maxpercent", 
isMapperFailureFatalEnabled(this.jobProps) ? "0" : "100"); // For Hadoop 2.x
 
     // Do not cancel delegation tokens after job has completed (HADOOP-7002)
     this.conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", 
false);
@@ -504,6 +504,13 @@ public class MRJobLauncher extends AbstractJobLauncher {
         && 
Boolean.parseBoolean(properties.getProperty(ENABLED_CUSTOMIZED_PROGRESS));
   }
 
+  static boolean isMapperFailureFatalEnabled(Properties properties) {
+    return (
+        
properties.containsKey(ConfigurationKeys.MR_JOB_MAPPER_FAILURE_IS_FATAL_KEY)
+        && 
Boolean.parseBoolean(properties.getProperty(ConfigurationKeys.MR_JOB_MAPPER_FAILURE_IS_FATAL_KEY)))
+        || ConfigurationKeys.DEFAULT_MR_JOB_MAPPER_FAILURE_IS_FATAL;
+  }
+
   @VisibleForTesting
   static void serializeJobState(FileSystem fs, Path mrJobDir, Configuration 
conf, JobState jobState, Job job)
       throws IOException {

Reply via email to