Repository: hadoop
Updated Branches:
  refs/heads/branch-2 1b91ebb71 -> a0b076785


MAPREDUCE-6740. Enforce mapreduce.task.timeout to be at least 
mapreduce.task.progress-report.interval. (Haibo Chen via kasha)

(cherry picked from commit 537095d13cd38212ed162e0a360bdd9a8bd83498)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a0b07678
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a0b07678
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a0b07678

Branch: refs/heads/branch-2
Commit: a0b07678568a4bdae19ad6245ba5d4d21324dccc
Parents: 1b91ebb
Author: Karthik Kambatla <ka...@apache.org>
Authored: Wed Sep 21 18:30:11 2016 -0700
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Wed Sep 21 18:30:42 2016 -0700

----------------------------------------------------------------------
 .../mapreduce/v2/app/TaskHeartbeatHandler.java  | 24 ++++++-
 .../v2/app/TestTaskHeartbeatHandler.java        | 67 ++++++++++++++++++++
 .../java/org/apache/hadoop/mapred/Task.java     |  8 ++-
 .../apache/hadoop/mapreduce/MRJobConfig.java    |  9 ++-
 .../hadoop/mapreduce/util/MRJobConfUtil.java    | 16 +++++
 5 files changed, 113 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0b07678/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java
index 303b4c1..6a716c7 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java
@@ -23,10 +23,12 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import 
org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
@@ -67,7 +69,7 @@ public class TaskHeartbeatHandler extends AbstractService {
   //received from a task.
   private Thread lostTaskCheckerThread;
   private volatile boolean stopped;
-  private int taskTimeOut = 5 * 60 * 1000;// 5 mins
+  private long taskTimeOut;
   private int taskTimeOutCheckInterval = 30 * 1000; // 30 seconds.
 
   private final EventHandler eventHandler;
@@ -87,7 +89,19 @@ public class TaskHeartbeatHandler extends AbstractService {
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     super.serviceInit(conf);
-    taskTimeOut = conf.getInt(MRJobConfig.TASK_TIMEOUT, 5 * 60 * 1000);
+    taskTimeOut = conf.getLong(
+        MRJobConfig.TASK_TIMEOUT, MRJobConfig.DEFAULT_TASK_TIMEOUT_MILLIS);
+
+    // enforce task timeout is at least twice as long as task report interval
+    long taskProgressReportIntervalMillis = MRJobConfUtil.
+        getTaskProgressReportInterval(conf);
+    long minimumTaskTimeoutAllowed = taskProgressReportIntervalMillis * 2;
+    if(taskTimeOut < minimumTaskTimeoutAllowed) {
+      taskTimeOut = minimumTaskTimeoutAllowed;
+      LOG.info("Task timeout must be as least twice as long as the task " +
+          "status report interval. Setting task timeout to " + taskTimeOut);
+    }
+
     taskTimeOutCheckInterval =
         conf.getInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 30 * 1000);
   }
@@ -140,7 +154,7 @@ public class TaskHeartbeatHandler extends AbstractService {
 
         while (iterator.hasNext()) {
           Map.Entry<TaskAttemptId, ReportTime> entry = iterator.next();
-          boolean taskTimedOut = (taskTimeOut > 0) && 
+          boolean taskTimedOut = (taskTimeOut > 0) &&
               (currentTime > (entry.getValue().getLastProgress() + 
taskTimeOut));
            
           if(taskTimedOut) {
@@ -163,4 +177,8 @@ public class TaskHeartbeatHandler extends AbstractService {
     }
   }
 
+  @VisibleForTesting
+  public long getTaskTimeOut() {
+    return taskTimeOut;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0b07678/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java
index e9a0989..2623849 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.junit.Assert;
 import org.junit.Test;
 
 
@@ -50,6 +51,9 @@ public class TestTaskHeartbeatHandler {
     
     Configuration conf = new Configuration();
     conf.setInt(MRJobConfig.TASK_TIMEOUT, 10); //10 ms
+    // set TASK_PROGRESS_REPORT_INTERVAL to a value smaller than TASK_TIMEOUT
+    // so that TASK_TIMEOUT is not overridden
+    conf.setLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, 5);
     conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 10); //10 ms
     
     hb.init(conf);
@@ -68,4 +72,67 @@ public class TestTaskHeartbeatHandler {
     }
   }
 
+  /**
+   * Test if the final heartbeat timeout is set correctly when task progress
+   * report interval is set bigger than the task timeout in the configuration.
+   */
+  @Test
+  public void testTaskTimeoutConfigSmallerThanTaskProgressReportInterval() {
+    testTaskTimeoutWrtProgressReportInterval(1000L, 5000L);
+  }
+
+  /**
+   * Test if the final heartbeat timeout is set correctly when task progress
+   * report interval is set smaller than the task timeout in the configuration.
+   */
+  @Test
+  public void testTaskTimeoutConfigBiggerThanTaskProgressReportInterval() {
+    testTaskTimeoutWrtProgressReportInterval(5000L, 1000L);
+  }
+
+  /**
+   * Test if the final heartbeat timeout is set correctly when task progress
+   * report interval is not set in the configuration.
+   */
+  @Test
+  public void testTaskTimeoutConfigWithoutTaskProgressReportInterval() {
+    final long taskTimeoutConfiged = 2000L;
+
+    final Configuration conf = new Configuration();
+    conf.setLong(MRJobConfig.TASK_TIMEOUT, taskTimeoutConfiged);
+
+    final long expectedTimeout = taskTimeoutConfiged;
+    verifyTaskTimeoutConfig(conf, expectedTimeout);
+  }
+
+  /**
+   * Test if task timeout is set properly in response to the configuration of
+   * the task progress report interval.
+   */
+  private static void testTaskTimeoutWrtProgressReportInterval(
+      long timeoutConfig, long taskreportInterval) {
+    final Configuration conf = new Configuration();
+    conf.setLong(MRJobConfig.TASK_TIMEOUT, timeoutConfig);
+    conf.setLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, 
taskreportInterval);
+
+    // expected task timeout is at least twice as long as task report interval
+    final long expectedTimeout = Math.max(timeoutConfig, taskreportInterval*2);
+    verifyTaskTimeoutConfig(conf, expectedTimeout);
+  }
+
+  /**
+   * Verify task timeout is set as expected in TaskHeartBeatHandler with given
+   * configuration.
+   * @param conf the configuration
+   * @param expectedTimeout expected timeout value
+   */
+  private static void verifyTaskTimeoutConfig(final Configuration conf,
+      final long expectedTimeout) {
+    final TaskHeartbeatHandler hb =
+        new TaskHeartbeatHandler(null, SystemClock.getInstance(), 1);
+    hb.init(conf);
+
+    Assert.assertTrue("The value of the task timeout is incorrect.",
+        hb.getTaskTimeOut() == expectedTimeout);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0b07678/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
index 924fc39..d6ce7a3 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
 import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
+import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
 import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.ExitUtil;
@@ -780,9 +781,10 @@ abstract public class Task implements Writable, 
Configurable {
       int remainingRetries = MAX_RETRIES;
       // get current flag value and reset it as well
       boolean sendProgress = resetProgressFlag();
-      long taskProgressInterval =
-          conf.getLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL,
-                       MRJobConfig.DEFAULT_TASK_PROGRESS_REPORT_INTERVAL);
+
+      long taskProgressInterval = MRJobConfUtil.
+          getTaskProgressReportInterval(conf);
+
       while (!taskDone.get()) {
         synchronized (lock) {
           done = false;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0b07678/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 9a88669..60d357f 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -54,11 +54,6 @@ public interface MRJobConfig {
   // negative values disable the limit
   public static final long DEFAULT_TASK_LOCAL_WRITE_LIMIT_BYTES = -1;
 
-  public static final String TASK_PROGRESS_REPORT_INTERVAL =
-      "mapreduce.task.progress-report.interval";
-  /** The number of milliseconds between progress reports. */
-  public static final int DEFAULT_TASK_PROGRESS_REPORT_INTERVAL = 3000;
-
   public static final String JAR = "mapreduce.job.jar";
 
   public static final String ID = "mapreduce.job.id";
@@ -255,6 +250,10 @@ public interface MRJobConfig {
   public static final String TASK_REDUCE_PROFILE_PARAMS = 
"mapreduce.task.profile.reduce.params";
   
   public static final String TASK_TIMEOUT = "mapreduce.task.timeout";
+  long DEFAULT_TASK_TIMEOUT_MILLIS = 5 * 60 * 1000L;
+
+  String TASK_PROGRESS_REPORT_INTERVAL =
+      "mapreduce.task.progress-report.interval";
 
   public static final String TASK_TIMEOUT_CHECK_INTERVAL_MS = 
"mapreduce.task.timeout.check-interval-ms";
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0b07678/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java
index 11d49a4..afedef3 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java
@@ -42,4 +42,20 @@ public final class MRJobConfUtil {
    */
   private MRJobConfUtil() {
   }
+
+  /**
+   * Get the progress heartbeat interval configuration for mapreduce tasks.
+   * By default, the value of progress heartbeat interval is a proportion of
+   * that of task timeout.
+   * @param conf  the job configuration to read from
+   * @return the value of task progress report interval
+   */
+  public static long getTaskProgressReportInterval(final Configuration conf) {
+    long taskHeartbeatTimeOut = conf.getLong(
+        MRJobConfig.TASK_TIMEOUT, MRJobConfig.DEFAULT_TASK_TIMEOUT_MILLIS);
+    return conf.getLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL,
+        (long) (TASK_REPORT_INTERVAL_TO_TIMEOUT_RATIO * taskHeartbeatTimeOut));
+  }
+
+  public static final float TASK_REPORT_INTERVAL_TO_TIMEOUT_RATIO = 0.01f;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to