MAPREDUCE-6984. MR AM to clean up temporary files from previous attempt in case of no recovery. (Gergo Repas via Haibo Chen)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cce71dce Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cce71dce Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cce71dce Branch: refs/heads/HDFS-7240 Commit: cce71dceef9e82d31fe8ec59648b2a4a50c8869a Parents: 4aca4ff Author: Haibo Chen <[email protected]> Authored: Fri Jan 19 12:56:17 2018 -0800 Committer: Haibo Chen <[email protected]> Committed: Fri Jan 19 12:56:17 2018 -0800 ---------------------------------------------------------------------- .../hadoop/mapreduce/v2/app/MRAppMaster.java | 23 ++++++ .../hadoop/mapreduce/v2/app/TestRecovery.java | 82 ++++++++++++++++++++ 2 files changed, 105 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cce71dce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index df61928..e6a45cf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -18,6 +18,7 @@ package org.apache.hadoop.mapreduce.v2.app; +import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; @@ -54,6 +55,7 @@ import org.apache.hadoop.mapred.TaskLog; import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; @@ -1217,6 +1219,7 @@ public class MRAppMaster extends CompositeService { amInfos = new LinkedList<AMInfo>(); completedTasksFromPreviousRun = new HashMap<TaskId, TaskInfo>(); processRecovery(); + cleanUpPreviousJobOutput(); // Current an AMInfo for the current AM generation. AMInfo amInfo = @@ -1395,6 +1398,26 @@ public class MRAppMaster extends CompositeService { return true; } + private void cleanUpPreviousJobOutput() { + // recovered application masters should not remove data from previous job + if (!recovered()) { + JobContext jobContext = getJobContextFromConf(getConfig()); + try { + LOG.info("Starting to clean up previous job's temporary files"); + this.committer.abortJob(jobContext, State.FAILED); + LOG.info("Finished cleaning up previous job temporary files"); + } catch (FileNotFoundException e) { + LOG.info("Previous job temporary files do not exist, " + + "no clean up was necessary."); + } catch (Exception e) { + // the clean up of a previous attempt is not critical to the success + // of this job - only logging the error + LOG.error("Error while trying to clean up previous job's temporary " + + "files", e); + } + } + } + private static FSDataInputStream getPreviousJobHistoryStream( Configuration conf, ApplicationAttemptId appAttemptId) throws IOException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/cce71dce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java index 17e07b1..893c4a0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java @@ -19,9 +19,11 @@ package org.apache.hadoop.mapreduce.v2.app; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; + import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -43,6 +45,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.JobID; @@ -452,6 +455,8 @@ public class TestRecovery { public static class TestFileOutputCommitter extends org.apache.hadoop.mapred.FileOutputCommitter { + private boolean abortJobCalled; + @Override public boolean isRecoverySupported( org.apache.hadoop.mapred.JobContext jobContext) { @@ -462,6 +467,16 @@ public class TestRecovery { } return isRecoverySupported; } + + @Override + public void abortJob(JobContext context, int runState) throws IOException { + super.abortJob(context, runState); + this.abortJobCalled = true; + } + + private boolean isAbortJobCalled() { + return this.abortJobCalled; + } } /** @@ -1010,6 +1025,73 @@ public class TestRecovery { } @Test + public void testPreviousJobOutputCleanedWhenNoRecovery() throws Exception { + int runCount = 0; + MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), + true, ++runCount); + Configuration conf = new Configuration(); + conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false); + conf.setClass("mapred.output.committer.class", + TestFileOutputCommitter.class, + org.apache.hadoop.mapred.OutputCommitter.class); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size()); + //stop the app before the job completes. + app.stop(); + app.close(); + + //rerun + app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false, + ++runCount); + job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size()); + TestFileOutputCommitter committer = ( + TestFileOutputCommitter) app.getCommitter(); + assertTrue("commiter.abortJob() has not been called", + committer.isAbortJobCalled()); + app.close(); + } + + @Test + public void testPreviousJobIsNotCleanedWhenRecovery() + throws Exception { + int runCount = 0; + MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), + true, ++runCount); + Configuration conf = new Configuration(); + conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); + conf.setClass("mapred.output.committer.class", + TestFileOutputCommitter.class, + org.apache.hadoop.mapred.OutputCommitter.class); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + // TestFileOutputCommitter supports recovery if want.am.recovery=true + conf.setBoolean("want.am.recovery", true); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size()); + //stop the app before the job completes. + app.stop(); + app.close(); + + //rerun + app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false, + ++runCount); + job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size()); + TestFileOutputCommitter committer = ( + TestFileOutputCommitter) app.getCommitter(); + assertFalse("commiter.abortJob() has been called", + committer.isAbortJobCalled()); + app.close(); + } + + @Test public void testOutputRecoveryMapsOnly() throws Exception { int runCount = 0; MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
