Repository: hadoop
Updated Branches:
  refs/heads/branch-3.0 e4dcc3e60 -> 4c238b50d


MAPREDUCE-7164. FileOutputCommitter does not report progress while merging 
paths. Contributed by Kuhu Shukla

(cherry picked from commit 4d8de7ab690ef919b392b12d856482a6a1f2bb3d)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4c238b50
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4c238b50
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4c238b50

Branch: refs/heads/branch-3.0
Commit: 4c238b50dfd83a10923bfd6eb28d7a6ad864a40f
Parents: e4dcc3e
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Nov 28 14:54:59 2018 -0600
Committer: Jason Lowe <jl...@apache.org>
Committed: Wed Nov 28 16:10:02 2018 -0600

----------------------------------------------------------------------
 .../lib/output/FileOutputCommitter.java         | 28 +++++++++++------
 .../lib/output/TestFileOutputCommitter.java     | 33 ++++++++++++++++++++
 2 files changed, 51 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c238b50/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
index 86af2cf..0ed3259 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.util.Progressable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -389,7 +390,7 @@ public class FileOutputCommitter extends 
PathOutputCommitter {
 
       if (algorithmVersion == 1) {
         for (FileStatus stat: getAllCommittedTaskPaths(context)) {
-          mergePaths(fs, stat, finalOutput);
+          mergePaths(fs, stat, finalOutput, context);
         }
       }
 
@@ -440,10 +441,11 @@ public class FileOutputCommitter extends 
PathOutputCommitter {
    * @throws IOException on any error
    */
   private void mergePaths(FileSystem fs, final FileStatus from,
-      final Path to) throws IOException {
+      final Path to, JobContext context) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Merging data from " + from + " to " + to);
     }
+    reportProgress(context);
     FileStatus toStat;
     try {
       toStat = fs.getFileStatus(to);
@@ -467,22 +469,28 @@ public class FileOutputCommitter extends 
PathOutputCommitter {
           if (!fs.delete(to, true)) {
             throw new IOException("Failed to delete " + to);
           }
-          renameOrMerge(fs, from, to);
+          renameOrMerge(fs, from, to, context);
         } else {
           //It is a directory so merge everything in the directories
           for (FileStatus subFrom : fs.listStatus(from.getPath())) {
             Path subTo = new Path(to, subFrom.getPath().getName());
-            mergePaths(fs, subFrom, subTo);
+            mergePaths(fs, subFrom, subTo, context);
           }
         }
       } else {
-        renameOrMerge(fs, from, to);
+        renameOrMerge(fs, from, to, context);
       }
     }
   }
 
-  private void renameOrMerge(FileSystem fs, FileStatus from, Path to)
-      throws IOException {
+  private void reportProgress(JobContext context) {
+    if (context instanceof Progressable) {
+      ((Progressable) context).progress();
+    }
+  }
+
+  private void renameOrMerge(FileSystem fs, FileStatus from, Path to,
+      JobContext context) throws IOException {
     if (algorithmVersion == 1) {
       if (!fs.rename(from.getPath(), to)) {
         throw new IOException("Failed to rename " + from + " to " + to);
@@ -491,7 +499,7 @@ public class FileOutputCommitter extends 
PathOutputCommitter {
       fs.mkdirs(to);
       for (FileStatus subFrom : fs.listStatus(from.getPath())) {
         Path subTo = new Path(to, subFrom.getPath().getName());
-        mergePaths(fs, subFrom, subTo);
+        mergePaths(fs, subFrom, subTo, context);
       }
     }
   }
@@ -583,7 +591,7 @@ public class FileOutputCommitter extends 
PathOutputCommitter {
               committedTaskPath);
         } else {
           // directly merge everything from taskAttemptPath to output directory
-          mergePaths(fs, taskAttemptDirStatus, outputPath);
+          mergePaths(fs, taskAttemptDirStatus, outputPath, context);
           LOG.info("Saved output of task '" + attemptId + "' to " +
               outputPath);
         }
@@ -696,7 +704,7 @@ public class FileOutputCommitter extends 
PathOutputCommitter {
           FileStatus from = fs.getFileStatus(previousCommittedTaskPath);
           LOG.info("Recovering task for upgrading scenario, moving files from "
               + previousCommittedTaskPath + " to " + outputPath);
-          mergePaths(fs, from, outputPath);
+          mergePaths(fs, from, outputPath, context);
         } catch (FileNotFoundException ignored) {
         }
         LOG.info("Done recovering task " + attemptId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c238b50/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
index f72aa55..594fd1c 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
@@ -57,6 +57,10 @@ import 
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
 @SuppressWarnings("unchecked")
 public class TestFileOutputCommitter {
   private static final Path outDir = new Path(
@@ -403,6 +407,35 @@ public class TestFileOutputCommitter {
   }
 
   @Test
+  public void testProgressDuringMerge() throws Exception {
+    Job job = Job.getInstance();
+    FileOutputFormat.setOutputPath(job, outDir);
+    Configuration conf = job.getConfiguration();
+    conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
+    conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+        2);
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+    TaskAttemptContext tContext = spy(new TaskAttemptContextImpl(conf, 
taskID));
+    FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
+
+    // setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+
+    // write output
+    MapFileOutputFormat theOutputFormat = new MapFileOutputFormat();
+    RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
+    writeMapFileOutput(theRecordWriter, tContext);
+
+    // do commit
+    committer.commitTask(tContext);
+    //make sure progress flag was set.
+    // The first time it is set is during commit but ensure that
+    // mergePaths call makes it go again.
+    verify(tContext, atLeast(2)).progress();
+  }
+
+  @Test
   public void testCommitterRepeatableV1() throws Exception {
     testCommitterRetryInternal(1);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to