This is an automated email from the ASF dual-hosted git repository.

wilfreds pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 0e0d88c  MAPREDUCE-7278. Speculative execution behavior is observed 
even when mapreduce.map.speculative and mapreduce.reduce.speculative are false
0e0d88c is described below

commit 0e0d88ca97041e64bb73483f39ec68a01bf30506
Author: Wilfred Spiegelenburg <wilfr...@apache.org>
AuthorDate: Thu May 28 13:23:01 2020 +1000

    MAPREDUCE-7278. Speculative execution behavior is observed even when 
mapreduce.map.speculative and mapreduce.reduce.speculative are false
    
    Contributed by Tarun Parimi.
    
    (cherry picked from commit 10db97df1c8562a9e29b00e60d5bde1773c09188)
---
 .../hadoop/mapreduce/v2/app/job/impl/TaskImpl.java |  25 +++--
 .../mapreduce/v2/TestSpeculativeExecution.java     | 107 ++++++++++++++++++++-
 2 files changed, 124 insertions(+), 8 deletions(-)

diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
index ce3b3cc..b34cd7f 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@@ -142,7 +143,8 @@ public abstract class TaskImpl implements Task, 
EventHandler<TaskEvent> {
   private boolean historyTaskStartGenerated = false;
   // Launch time reported in history events.
   private long launchTime;
-  
+  private boolean speculationEnabled = false;
+
   private static final SingleArcTransition<TaskImpl, TaskEvent> 
      ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
   private static final SingleArcTransition<TaskImpl, TaskEvent> 
@@ -325,6 +327,9 @@ public abstract class TaskImpl implements Task, 
EventHandler<TaskEvent> {
     this.appContext = appContext;
     this.encryptedShuffle = conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
                                             
MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT);
+    this.speculationEnabled = taskType.equals(TaskType.MAP) ?
+        conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false) :
+        conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
 
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
@@ -1079,13 +1084,19 @@ public abstract class TaskImpl implements Task, 
EventHandler<TaskEvent> {
         if (task.successfulAttempt == null) {
           boolean shouldAddNewAttempt = true;
           if (task.inProgressAttempts.size() > 0) {
-            // if not all of the inProgressAttempts are hanging for resource
-            for (TaskAttemptId attemptId : task.inProgressAttempts) {
-              if (((TaskAttemptImpl) task.getAttempt(attemptId))
-                  .isContainerAssigned()) {
-                shouldAddNewAttempt = false;
-                break;
+            if(task.speculationEnabled) {
+              // if not all of the inProgressAttempts are hanging for resource
+              for (TaskAttemptId attemptId : task.inProgressAttempts) {
+                if (((TaskAttemptImpl) task.getAttempt(attemptId))
+                    .isContainerAssigned()) {
+                  shouldAddNewAttempt = false;
+                  break;
+                }
               }
+            } else {
+              // No need to add new attempt if there are in progress attempts
+              // when speculation is false
+              shouldAddNewAttempt = false;
             }
           }
           if (shouldAddNewAttempt) {
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java
index fe21f07..8527dc3 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java
@@ -192,6 +192,46 @@ public class TestSpeculativeExecution {
     }
   }
 
+  public static class FailOnceMapper extends
+      Mapper<Object, Text, Text, IntWritable> {
+
+    public void map(Object key, Text value, Context context)
+        throws IOException, InterruptedException {
+      TaskAttemptID taid = context.getTaskAttemptID();
+      try{
+        Thread.sleep(2000);
+      } catch(InterruptedException ie) {
+        // Ignore
+      }
+      // Fail mapper only for first attempt
+      if (taid.getId() == 0) {
+        throw new RuntimeException("Failing this mapper");
+      }
+
+      context.write(value, new IntWritable(1));
+    }
+  }
+
+  public static class FailOnceReducer extends
+      Reducer<Text, IntWritable, Text, IntWritable> {
+
+    public void reduce(Text key, Iterable<IntWritable> values,
+        Context context) throws IOException, InterruptedException {
+      TaskAttemptID taid = context.getTaskAttemptID();
+      try{
+        Thread.sleep(2000);
+      } catch(InterruptedException ie) {
+        // Ignore
+      }
+      // Fail reduce only for first attempt
+      if (taid.getId() == 0) {
+        throw new RuntimeException("Failing this reducer");
+      }
+      context.write(key, new IntWritable(0));
+    }
+  }
+
+
   @Test
   public void testSpeculativeExecution() throws Exception {
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
@@ -218,6 +258,30 @@ public class TestSpeculativeExecution {
     Assert.assertEquals(0, counters.findCounter(JobCounter.NUM_FAILED_MAPS)
             .getValue());
 
+
+    /*------------------------------------------------------------------
+     * Test that Map/Red does not speculate if MAP_SPECULATIVE and
+     * REDUCE_SPECULATIVE are both false. When map tasks fail once and time 
out,
+     * we shouldn't launch two simultaneous attempts. MAPREDUCE-7278
+     * -----------------------------------------------------------------
+     */
+    job = runNonSpecFailOnceTest();
+
+    succeeded = job.waitForCompletion(true);
+    Assert.assertTrue(succeeded);
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+    counters = job.getCounters();
+    // We will have 4 total since 2 map tasks fail and relaunch attempt once
+    Assert.assertEquals(4,
+        counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS).getValue());
+    Assert.assertEquals(4,
+        counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue());
+    // Ensure no maps or reduces killed due to accidental speculation
+    Assert.assertEquals(0,
+        counters.findCounter(JobCounter.NUM_KILLED_MAPS).getValue());
+    Assert.assertEquals(0,
+        counters.findCounter(JobCounter.NUM_KILLED_REDUCES).getValue());
+
     /*----------------------------------------------------------------------
      * Test that Mapper speculates if MAP_SPECULATIVE is true and
      * REDUCE_SPECULATIVE is false.
@@ -295,7 +359,48 @@ public class TestSpeculativeExecution {
 
     // Delete output directory if it exists.
     try {
-      localFs.delete(TEST_OUT_DIR,true);
+      localFs.delete(TEST_OUT_DIR, true);
+    } catch (IOException e) {
+      // ignore
+    }
+
+    // Creates the Job Configuration
+    job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+    job.setMaxMapAttempts(2);
+
+    job.submit();
+
+    return job;
+  }
+
+  private Job runNonSpecFailOnceTest()
+      throws IOException, ClassNotFoundException, InterruptedException {
+
+    Path first = createTempFile("specexec_map_input1", "a\nz");
+    Path secnd = createTempFile("specexec_map_input2", "a\nz");
+
+    Configuration conf = mrCluster.getConfig();
+    conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
+    conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+    // Prevent blacklisting since tasks fail once
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, false);
+    // Setting small task exit timeout values reproduces MAPREDUCE-7278
+    conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT, 20);
+    conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS, 10);
+    Job job = Job.getInstance(conf);
+    job.setJarByClass(TestSpeculativeExecution.class);
+    job.setMapperClass(FailOnceMapper.class);
+    job.setReducerClass(FailOnceReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+    job.setNumReduceTasks(2);
+    FileInputFormat.setInputPaths(job, first);
+    FileInputFormat.addInputPath(job, secnd);
+    FileOutputFormat.setOutputPath(job, TEST_OUT_DIR);
+
+    // Delete output directory if it exists.
+    try {
+      localFs.delete(TEST_OUT_DIR, true);
     } catch (IOException e) {
       // ignore
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to