Repository: hadoop Updated Branches: refs/heads/branch-2 cb19552fb -> be72ed1c7
MAPREDUCE-6489. Fail fast rogue tasks that write too much to local disk. Contributed by Maysam Yabandeh Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/be72ed1c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/be72ed1c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/be72ed1c Branch: refs/heads/branch-2 Commit: be72ed1c7558987029497da6210238f8803416f4 Parents: cb19552 Author: Jason Lowe <[email protected]> Authored: Wed Oct 21 14:10:28 2015 +0000 Committer: Jason Lowe <[email protected]> Committed: Wed Oct 21 14:11:48 2015 +0000 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 3 + .../java/org/apache/hadoop/mapred/Task.java | 72 ++++++++++++++--- .../apache/hadoop/mapreduce/MRJobConfig.java | 5 ++ .../src/main/resources/mapred-default.xml | 12 +++ .../hadoop/mapred/TestTaskProgressReporter.java | 83 +++++++++++++++++++- 5 files changed, 164 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/be72ed1c/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index ea90466..940ae6c 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -127,6 +127,9 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-6479. Add missing mapred job command options in mapreduce document. (nijel via aajisaka) + MAPREDUCE-6489. Fail fast rogue tasks that write too much to local disk + (Maysam Yabandeh via jlowe) + OPTIMIZATIONS MAPREDUCE-6376. Add avro binary support for jhist files (Ray Chiang via http://git-wip-us.apache.org/repos/asf/hadoop/blob/be72ed1c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java index 5031368..794ca07 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java @@ -42,6 +42,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.io.BytesWritable; @@ -63,6 +64,7 @@ import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer; import org.apache.hadoop.mapreduce.task.ReduceContextImpl; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; @@ -729,11 +731,49 @@ abstract public class Task implements Writable, Configurable { } else { return split; } - } - /** - * The communication thread handles communication with the parent (Task Tracker). - * It sends progress updates if progress has been made or if the task needs to - * let the parent know that it's alive. It also pings the parent to see if it's alive. + } + + /** + * exception thrown when the task exceeds some configured limits. + */ + public class TaskLimitException extends IOException { + public TaskLimitException(String str) { + super(str); + } + } + + /** + * check the counters to see whether the task has exceeded any configured + * limits. + * @throws TaskLimitException + */ + protected void checkTaskLimits() throws TaskLimitException { + // check the limit for writing to local file system + long limit = conf.getLong(MRJobConfig.TASK_LOCAL_WRITE_LIMIT_BYTES, + MRJobConfig.DEFAULT_TASK_LOCAL_WRITE_LIMIT_BYTES); + if (limit >= 0) { + Counters.Counter localWritesCounter = null; + try { + LocalFileSystem localFS = FileSystem.getLocal(conf); + localWritesCounter = counters.findCounter(localFS.getScheme(), + FileSystemCounter.BYTES_WRITTEN); + } catch (IOException e) { + LOG.warn("Could not get LocalFileSystem BYTES_WRITTEN counter"); + } + if (localWritesCounter != null + && localWritesCounter.getCounter() > limit) { + throw new TaskLimitException("too much write to local file system." + + " current value is " + localWritesCounter.getCounter() + + " the limit is " + limit); + } + } + } + + /** + * The communication thread handles communication with the parent (Task + * Tracker). It sends progress updates if progress has been made or if + * the task needs to let the parent know that it's alive. It also pings + * the parent to see if it's alive. */ public void run() { final int MAX_RETRIES = 3; @@ -763,8 +803,9 @@ abstract public class Task implements Writable, Configurable { if (sendProgress) { // we need to send progress update updateCounters(); + checkTaskLimits(); taskStatus.statusUpdate(taskProgress.get(), - taskProgress.toString(), + taskProgress.toString(), counters); taskFound = umbilical.statusUpdate(taskId, taskStatus); taskStatus.clearStatus(); @@ -782,10 +823,21 @@ abstract public class Task implements Writable, Configurable { System.exit(66); } - sendProgress = resetProgressFlag(); + sendProgress = resetProgressFlag(); remainingRetries = MAX_RETRIES; - } - catch (Throwable t) { + } catch (TaskLimitException e) { + String errMsg = "Task exceeded the limits: " + + StringUtils.stringifyException(e); + LOG.fatal(errMsg); + try { + umbilical.fatalError(taskId, errMsg); + } catch (IOException ioe) { + LOG.fatal("Failed to update failure diagnosis", ioe); + } + LOG.fatal("Killing " + taskId); + resetDoneFlag(); + ExitUtil.terminate(69); + } catch (Throwable t) { LOG.info("Communication exception: " + StringUtils.stringifyException(t)); remainingRetries -=1; if (remainingRetries == 0) { @@ -1043,7 +1095,7 @@ abstract public class Task implements Writable, Configurable { TaskReporter reporter ) throws IOException, InterruptedException { LOG.info("Task:" + taskId + " is done." - + " And is in the process of committing"); + + " And is in the process of committing"); updateCounters(); boolean commitRequired = isCommitRequired(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/be72ed1c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 401aedb..0ed2f29 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -49,6 +49,11 @@ public interface MRJobConfig { public static final String TASK_CLEANUP_NEEDED = "mapreduce.job.committer.task.cleanup.needed"; + public static final String TASK_LOCAL_WRITE_LIMIT_BYTES = + "mapreduce.task.local-fs.write-limit.bytes"; + // negative values disable the limit + public static final long DEFAULT_TASK_LOCAL_WRITE_LIMIT_BYTES = -1; + public static final String TASK_PROGRESS_REPORT_INTERVAL = "mapreduce.task.progress-report.interval"; /** The number of milliseconds between progress reports. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/be72ed1c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 0d15ca8..ae84f8c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1781,4 +1781,16 @@ </description> </property> +<property> + <name>mapreduce.task.local-fs.write-limit.bytes</name> + <value>-1</value> + <description>Limit on the byte written to the local file system by each task. + This limit only applies to writes that go through the Hadoop filesystem APIs + within the task process (i.e.: writes that will update the local filesystem's + BYTES_WRITTEN counter). It does not cover other writes such as logging, + sideband writes from subprocesses (e.g.: streaming jobs), etc. + Negative values disable the limit. + default is -1</description> +</property> + </configuration> http://git-wip-us.apache.org/repos/asf/hadoop/blob/be72ed1c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java index 66632b3..b20ad96 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java @@ -19,15 +19,28 @@ package org.apache.hadoop.mapred; import java.io.IOException; +import java.util.Random; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.mapred.SortedRanges.Range; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.util.ExitUtil; import org.junit.Assert; import org.junit.Test; public class TestTaskProgressReporter { private static int statusUpdateTimes = 0; + + // set to true if the thread is existed with ExitUtil.terminate + volatile boolean threadExited = false; + + final static int LOCAL_BYTES_WRITTEN = 1024; + private FakeUmbilical fakeUmbilical = new FakeUmbilical(); private static class DummyTask extends Task { @@ -120,13 +133,22 @@ public class TestTaskProgressReporter { } private class DummyTaskReporter extends Task.TaskReporter { + volatile boolean taskLimitIsChecked = false; + public DummyTaskReporter(Task task) { task.super(task.getProgress(), fakeUmbilical); } + @Override public void setProgress(float progress) { super.setProgress(progress); } + + @Override + protected void checkTaskLimits() throws TaskLimitException { + taskLimitIsChecked = true; + super.checkTaskLimits(); + } } @Test (timeout=10000) @@ -144,4 +166,63 @@ public class TestTaskProgressReporter { t.join(); Assert.assertEquals(statusUpdateTimes, 2); } -} \ No newline at end of file + + @Test(timeout=10000) + public void testBytesWrittenRespectingLimit() throws Exception { + // add 1024 to the limit to account for writes not controlled by the test + testBytesWrittenLimit(LOCAL_BYTES_WRITTEN + 1024, false); + } + + @Test(timeout=10000) + public void testBytesWrittenExceedingLimit() throws Exception { + testBytesWrittenLimit(LOCAL_BYTES_WRITTEN - 1, true); + } + + /** + * This is to test the limit on BYTES_WRITTEN. The test is limited in that + * the check is done only once at the first loop of TaskReport#run. + * @param limit the limit on BYTES_WRITTEN in local file system + * @param failFast should the task fail fast with such limit? + * @throws Exception + */ + public void testBytesWrittenLimit(long limit, boolean failFast) + throws Exception { + ExitUtil.disableSystemExit(); + threadExited = false; + Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() { + public void uncaughtException(Thread th, Throwable ex) { + System.out.println("Uncaught exception: " + ex); + if (ex instanceof ExitUtil.ExitException) { + threadExited = true; + } + } + }; + JobConf conf = new JobConf(); + // To disable task reporter sleeping + conf.getLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, 0); + conf.setLong(MRJobConfig.TASK_LOCAL_WRITE_LIMIT_BYTES, limit); + LocalFileSystem localFS = FileSystem.getLocal(conf); + Path tmpPath = new Path("/tmp/testBytesWrittenLimit-tmpFile-" + + new Random(System.currentTimeMillis()).nextInt()); + FSDataOutputStream out = localFS.create(tmpPath, true); + out.write(new byte[LOCAL_BYTES_WRITTEN]); + out.close(); + + Task task = new DummyTask(); + task.setConf(conf); + DummyTaskReporter reporter = new DummyTaskReporter(task); + Thread t = new Thread(reporter); + t.setUncaughtExceptionHandler(h); + reporter.setProgressFlag(); + + t.start(); + while (!reporter.taskLimitIsChecked) { + Thread.yield(); + } + + task.setTaskDone(); + reporter.resetDoneFlag(); + t.join(); + Assert.assertEquals(failFast, threadExited); + } +}
