Repository: hadoop Updated Branches: refs/heads/branch-3.0 e4dcc3e60 -> 4c238b50d
MAPREDUCE-7164. FileOutputCommitter does not report progress while merging paths. Contributed by Kuhu Shukla (cherry picked from commit 4d8de7ab690ef919b392b12d856482a6a1f2bb3d) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4c238b50 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4c238b50 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4c238b50 Branch: refs/heads/branch-3.0 Commit: 4c238b50dfd83a10923bfd6eb28d7a6ad864a40f Parents: e4dcc3e Author: Jason Lowe <jl...@apache.org> Authored: Wed Nov 28 14:54:59 2018 -0600 Committer: Jason Lowe <jl...@apache.org> Committed: Wed Nov 28 16:10:02 2018 -0600 ---------------------------------------------------------------------- .../lib/output/FileOutputCommitter.java | 28 +++++++++++------ .../lib/output/TestFileOutputCommitter.java | 33 ++++++++++++++++++++ 2 files changed, 51 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c238b50/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java index 86af2cf..0ed3259 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java @@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -389,7 +390,7 @@ public class FileOutputCommitter extends PathOutputCommitter { if (algorithmVersion == 1) { for (FileStatus stat: getAllCommittedTaskPaths(context)) { - mergePaths(fs, stat, finalOutput); + mergePaths(fs, stat, finalOutput, context); } } @@ -440,10 +441,11 @@ public class FileOutputCommitter extends PathOutputCommitter { * @throws IOException on any error */ private void mergePaths(FileSystem fs, final FileStatus from, - final Path to) throws IOException { + final Path to, JobContext context) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Merging data from " + from + " to " + to); } + reportProgress(context); FileStatus toStat; try { toStat = fs.getFileStatus(to); @@ -467,22 +469,28 @@ public class FileOutputCommitter extends PathOutputCommitter { if (!fs.delete(to, true)) { throw new IOException("Failed to delete " + to); } - renameOrMerge(fs, from, to); + renameOrMerge(fs, from, to, context); } else { //It is a directory so merge everything in the directories for (FileStatus subFrom : fs.listStatus(from.getPath())) { Path subTo = new Path(to, subFrom.getPath().getName()); - mergePaths(fs, subFrom, subTo); + mergePaths(fs, subFrom, subTo, context); } } } else { - renameOrMerge(fs, from, to); + renameOrMerge(fs, from, to, context); } } } - private void renameOrMerge(FileSystem fs, FileStatus from, Path to) - throws IOException { + private void reportProgress(JobContext context) { + if (context instanceof Progressable) { + ((Progressable) context).progress(); + } + } + + private void renameOrMerge(FileSystem fs, FileStatus from, Path to, + JobContext context) throws IOException { if (algorithmVersion == 1) { if (!fs.rename(from.getPath(), to)) { throw new IOException("Failed to rename " + from + " to " + to); @@ -491,7 +499,7 @@ public class FileOutputCommitter extends PathOutputCommitter { fs.mkdirs(to); for (FileStatus subFrom : fs.listStatus(from.getPath())) { Path subTo = new Path(to, subFrom.getPath().getName()); - mergePaths(fs, subFrom, subTo); + mergePaths(fs, subFrom, subTo, context); } } } @@ -583,7 +591,7 @@ public class FileOutputCommitter extends PathOutputCommitter { committedTaskPath); } else { // directly merge everything from taskAttemptPath to output directory - mergePaths(fs, taskAttemptDirStatus, outputPath); + mergePaths(fs, taskAttemptDirStatus, outputPath, context); LOG.info("Saved output of task '" + attemptId + "' to " + outputPath); } @@ -696,7 +704,7 @@ public class FileOutputCommitter extends PathOutputCommitter { FileStatus from = fs.getFileStatus(previousCommittedTaskPath); LOG.info("Recovering task for upgrading scenario, moving files from " + previousCommittedTaskPath + " to " + outputPath); - mergePaths(fs, from, outputPath); + mergePaths(fs, from, outputPath, context); } catch (FileNotFoundException ignored) { } LOG.info("Done recovering task " + attemptId); http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c238b50/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java index f72aa55..594fd1c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java @@ -57,6 +57,10 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + @SuppressWarnings("unchecked") public class TestFileOutputCommitter { private static final Path outDir = new Path( @@ -403,6 +407,35 @@ public class TestFileOutputCommitter { } @Test + public void testProgressDuringMerge() throws Exception { + Job job = Job.getInstance(); + FileOutputFormat.setOutputPath(job, outDir); + Configuration conf = job.getConfiguration(); + conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); + conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, + 2); + JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); + TaskAttemptContext tContext = spy(new TaskAttemptContextImpl(conf, taskID)); + FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext); + + // setup + committer.setupJob(jContext); + committer.setupTask(tContext); + + // write output + MapFileOutputFormat theOutputFormat = new MapFileOutputFormat(); + RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext); + writeMapFileOutput(theRecordWriter, tContext); + + // do commit + committer.commitTask(tContext); + //make sure progress flag was set. + // The first time it is set is during commit but ensure that + // mergePaths call makes it go again. + verify(tContext, atLeast(2)).progress(); + } + + @Test public void testCommitterRepeatableV1() throws Exception { testCommitterRetryInternal(1); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org