Repository: hadoop
Updated Branches:
  refs/heads/branch-3.2 df0e7766e -> 7a78bdf7b


MAPREDUCE-7164. FileOutputCommitter does not report progress while merging 
paths. Contributed by Kuhu Shukla

(cherry picked from commit 4d8de7ab690ef919b392b12d856482a6a1f2bb3d)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7a78bdf7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7a78bdf7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7a78bdf7

Branch: refs/heads/branch-3.2
Commit: 7a78bdf7bbf278678dc10de3133930723972b60d
Parents: df0e776
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Nov 28 14:54:59 2018 -0600
Committer: Jason Lowe <jl...@apache.org>
Committed: Wed Nov 28 15:54:59 2018 -0600

----------------------------------------------------------------------
 .../lib/output/FileOutputCommitter.java         | 28 +++++++++++------
 .../lib/output/TestFileOutputCommitter.java     | 33 ++++++++++++++++++++
 2 files changed, 51 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a78bdf7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
index cbae575..94af338 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.util.Progressable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -400,7 +401,7 @@ public class FileOutputCommitter extends 
PathOutputCommitter {
 
       if (algorithmVersion == 1) {
         for (FileStatus stat: getAllCommittedTaskPaths(context)) {
-          mergePaths(fs, stat, finalOutput);
+          mergePaths(fs, stat, finalOutput, context);
         }
       }
 
@@ -451,10 +452,11 @@ public class FileOutputCommitter extends 
PathOutputCommitter {
    * @throws IOException on any error
    */
   private void mergePaths(FileSystem fs, final FileStatus from,
-      final Path to) throws IOException {
+      final Path to, JobContext context) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Merging data from " + from + " to " + to);
     }
+    reportProgress(context);
     FileStatus toStat;
     try {
       toStat = fs.getFileStatus(to);
@@ -478,22 +480,28 @@ public class FileOutputCommitter extends 
PathOutputCommitter {
           if (!fs.delete(to, true)) {
             throw new IOException("Failed to delete " + to);
           }
-          renameOrMerge(fs, from, to);
+          renameOrMerge(fs, from, to, context);
         } else {
           //It is a directory so merge everything in the directories
           for (FileStatus subFrom : fs.listStatus(from.getPath())) {
             Path subTo = new Path(to, subFrom.getPath().getName());
-            mergePaths(fs, subFrom, subTo);
+            mergePaths(fs, subFrom, subTo, context);
           }
         }
       } else {
-        renameOrMerge(fs, from, to);
+        renameOrMerge(fs, from, to, context);
       }
     }
   }
 
-  private void renameOrMerge(FileSystem fs, FileStatus from, Path to)
-      throws IOException {
+  private void reportProgress(JobContext context) {
+    if (context instanceof Progressable) {
+      ((Progressable) context).progress();
+    }
+  }
+
+  private void renameOrMerge(FileSystem fs, FileStatus from, Path to,
+      JobContext context) throws IOException {
     if (algorithmVersion == 1) {
       if (!fs.rename(from.getPath(), to)) {
         throw new IOException("Failed to rename " + from + " to " + to);
@@ -502,7 +510,7 @@ public class FileOutputCommitter extends 
PathOutputCommitter {
       fs.mkdirs(to);
       for (FileStatus subFrom : fs.listStatus(from.getPath())) {
         Path subTo = new Path(to, subFrom.getPath().getName());
-        mergePaths(fs, subFrom, subTo);
+        mergePaths(fs, subFrom, subTo, context);
       }
     }
   }
@@ -594,7 +602,7 @@ public class FileOutputCommitter extends 
PathOutputCommitter {
               committedTaskPath);
         } else {
           // directly merge everything from taskAttemptPath to output directory
-          mergePaths(fs, taskAttemptDirStatus, outputPath);
+          mergePaths(fs, taskAttemptDirStatus, outputPath, context);
           LOG.info("Saved output of task '" + attemptId + "' to " +
               outputPath);
 
@@ -718,7 +726,7 @@ public class FileOutputCommitter extends 
PathOutputCommitter {
           FileStatus from = fs.getFileStatus(previousCommittedTaskPath);
           LOG.info("Recovering task for upgrading scenario, moving files from "
               + previousCommittedTaskPath + " to " + outputPath);
-          mergePaths(fs, from, outputPath);
+          mergePaths(fs, from, outputPath, context);
         } catch (FileNotFoundException ignored) {
         }
         LOG.info("Done recovering task " + attemptId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a78bdf7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
index fc43dce..dd717a6 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
@@ -58,6 +58,10 @@ import 
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
 @SuppressWarnings("unchecked")
 public class TestFileOutputCommitter {
   private static final Path outDir = new Path(
@@ -435,6 +439,35 @@ public class TestFileOutputCommitter {
   }
 
   @Test
+  public void testProgressDuringMerge() throws Exception {
+    Job job = Job.getInstance();
+    FileOutputFormat.setOutputPath(job, outDir);
+    Configuration conf = job.getConfiguration();
+    conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
+    conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+        2);
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+    TaskAttemptContext tContext = spy(new TaskAttemptContextImpl(conf, 
taskID));
+    FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
+
+    // setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+
+    // write output
+    MapFileOutputFormat theOutputFormat = new MapFileOutputFormat();
+    RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
+    writeMapFileOutput(theRecordWriter, tContext);
+
+    // do commit
+    committer.commitTask(tContext);
+    //make sure progress flag was set.
+    // The first time it is set is during commit but ensure that
+    // mergePaths call makes it go again.
+    verify(tContext, atLeast(2)).progress();
+  }
+
+  @Test
   public void testCommitterRepeatableV1() throws Exception {
     testCommitterRetryInternal(1);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to