MAPREDUCE-7029. FileOutputCommitter is slow on filesystems lacking recursive delete. Contributed by Karthik Palaniappan
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6e42d058 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6e42d058 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6e42d058 Branch: refs/heads/HDFS-7240 Commit: 6e42d058292d9656e9ebc9a47be13280e3c919ea Parents: 09efdfe Author: Jason Lowe <[email protected]> Authored: Wed Jan 17 08:14:11 2018 -0600 Committer: Jason Lowe <[email protected]> Committed: Wed Jan 17 08:14:11 2018 -0600 ---------------------------------------------------------------------- .../lib/output/FileOutputCommitter.java | 22 +++++++++++ .../src/main/resources/mapred-default.xml | 11 ++++++ .../lib/output/TestFileOutputCommitter.java | 39 ++++++++++++++++++-- 3 files changed, 68 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e42d058/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java index 86af2cf..cbae575 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java @@ -87,6 +87,17 @@ public class FileOutputCommitter extends PathOutputCommitter { // default value to be 1 to keep consistent with previous behavior public static final int FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT = 1; + // Whether tasks should delete their task temporary directories. This is + // purely an optimization for filesystems without O(1) recursive delete, as + // commitJob will recursively delete the entire job temporary directory. + // HDFS has O(1) recursive delete, so this parameter is left false by default. + // Users of object stores, for example, may want to set this to true. Note: + // this is only used if mapreduce.fileoutputcommitter.algorithm.version=2 + public static final String FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED = + "mapreduce.fileoutputcommitter.task.cleanup.enabled"; + public static final boolean + FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED_DEFAULT = false; + private Path outputPath = null; private Path workPath = null; private final int algorithmVersion; @@ -586,6 +597,17 @@ public class FileOutputCommitter extends PathOutputCommitter { mergePaths(fs, taskAttemptDirStatus, outputPath); LOG.info("Saved output of task '" + attemptId + "' to " + outputPath); + + if (context.getConfiguration().getBoolean( + FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED, + FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED_DEFAULT)) { + LOG.debug(String.format( + "Deleting the temporary directory of '%s': '%s'", + attemptId, taskAttemptPath)); + if(!fs.delete(taskAttemptPath, true)) { + LOG.warn("Could not delete " + taskAttemptPath); + } + } } } else { LOG.warn("No Output found for " + attemptId); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e42d058/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 1e432ce..62f3dfa 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1495,6 +1495,17 @@ </property> <property> + <name>mapreduce.fileoutputcommitter.task.cleanup.enabled</name> + <value>false</value> + <description>Whether tasks should delete their task temporary directories. This is purely an + optimization for filesystems without O(1) recursive delete, as commitJob will recursively delete + the entire job temporary directory. HDFS has O(1) recursive delete, so this parameter is left + false by default. Users of object stores, for example, may want to set this to true. + + Note: this is only used if mapreduce.fileoutputcommitter.algorithm.version=2</description> +</property> + +<property> <name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name> <value>1000</value> <description>The interval in ms at which the MR AppMaster should send http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e42d058/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java index f72aa55..cd9d44b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java @@ -255,13 +255,18 @@ public class TestFileOutputCommitter { assert(dataFileFound && indexFileFound); } - private void testCommitterInternal(int version) throws Exception { + private void testCommitterInternal(int version, boolean taskCleanup) + throws Exception { Job job = Job.getInstance(); FileOutputFormat.setOutputPath(job, outDir); Configuration conf = job.getConfiguration(); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); - conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, + conf.setInt( + FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version); + conf.setBoolean( + FileOutputCommitter.FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED, + taskCleanup); JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext); @@ -275,9 +280,30 @@ public class TestFileOutputCommitter { RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext); writeOutput(theRecordWriter, tContext); + // check task and job temp directories exist + File jobOutputDir = new File( + new Path(outDir, FileOutputCommitter.PENDING_DIR_NAME).toString()); + File taskOutputDir = new File(Path.getPathWithoutSchemeAndAuthority( + committer.getWorkPath()).toString()); + assertTrue("job temp dir does not exist", jobOutputDir.exists()); + assertTrue("task temp dir does not exist", taskOutputDir.exists()); + // do commit committer.commitTask(tContext); + assertTrue("job temp dir does not exist", jobOutputDir.exists()); + if (version == 1 || taskCleanup) { + // Task temp dir gets renamed in v1 and deleted if taskCleanup is + // enabled in v2 + assertFalse("task temp dir still exists", taskOutputDir.exists()); + } else { + // By default, in v2 the task temp dir is only deleted during commitJob + assertTrue("task temp dir does not exist", taskOutputDir.exists()); + } + + // Entire job temp directory gets deleted, including task temp dir committer.commitJob(jContext); + assertFalse("job temp dir still exists", jobOutputDir.exists()); + assertFalse("task temp dir still exists", taskOutputDir.exists()); // validate output validateContent(outDir); @@ -286,12 +312,17 @@ public class TestFileOutputCommitter { @Test public void testCommitterV1() throws Exception { - testCommitterInternal(1); + testCommitterInternal(1, false); } @Test public void testCommitterV2() throws Exception { - testCommitterInternal(2); + testCommitterInternal(2, false); + } + + @Test + public void testCommitterV2TaskCleanupEnabled() throws Exception { + testCommitterInternal(2, true); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
