Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 e1716c7cf -> f95f416fa


MAPREDUCE-5485. Allow repeating job commit by extending OutputCommitter API. 
Contributed by Junping Du


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f95f416f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f95f416f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f95f416f

Branch: refs/heads/branch-2.7
Commit: f95f416fad4a4acad2f39065ac20a1b2c0b07bbb
Parents: e1716c7
Author: Jian He <jia...@apache.org>
Authored: Mon Nov 16 17:13:49 2015 -0800
Committer: Jian He <jia...@apache.org>
Committed: Mon Nov 16 17:13:49 2015 -0800

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |   3 +
 .../hadoop/mapreduce/v2/app/MRAppMaster.java    |  57 ++++--
 .../v2/app/commit/CommitterEventHandler.java    |  25 ++-
 .../hadoop/mapred/FileOutputCommitter.java      |   7 +-
 .../apache/hadoop/mapred/OutputCommitter.java   |  34 +++-
 .../hadoop/mapreduce/OutputCommitter.java       |  26 +++
 .../lib/output/FileOutputCommitter.java         |  70 ++++++-
 .../hadoop/mapred/TestFileOutputCommitter.java  | 151 ++++++++++++++
 .../lib/output/TestFileOutputCommitter.java     | 197 ++++++++++++++++++-
 9 files changed, 542 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f95f416f/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt 
b/hadoop-mapreduce-project/CHANGES.txt
index 3bd820f..fffcf5b 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -14,6 +14,9 @@ Release 2.7.3 - UNRELEASED
 
     MAPREDUCE-6540. TestMRTimelineEventHandling fails (sjlee)
 
+    MAPREDUCE-5485. Allow repeating job commit by extending OutputCommitter 
API. 
+    (Junping Du via jianhe)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f95f416f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
index 26caa2d..4e0724d 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
@@ -281,10 +281,12 @@ public class MRAppMaster extends CompositeService {
     }
     
     boolean copyHistory = false;
+    committer = createOutputCommitter(conf);
     try {
       String user = UserGroupInformation.getCurrentUser().getShortUserName();
       Path stagingDir = MRApps.getStagingAreaDir(conf, user);
       FileSystem fs = getFileSystem(conf);
+
       boolean stagingExists = fs.exists(stagingDir);
       Path startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId);
       boolean commitStarted = fs.exists(startCommitFile);
@@ -317,9 +319,16 @@ public class MRAppMaster extends CompositeService {
           shutDownMessage = "We crashed after a commit failure.";
           forcedState = JobStateInternal.FAILED;
         } else {
-          //The commit is still pending, commit error
-          shutDownMessage = "We crashed durring a commit";
-          forcedState = JobStateInternal.ERROR;
+          if (isCommitJobRepeatable()) {
+            // cleanup previous half done commits if committer supports
+            // repeatable job commit.
+            errorHappenedShutDown = false;
+            cleanupInterruptedCommit(conf, fs, startCommitFile);
+          } else {
+            //The commit is still pending, commit error
+            shutDownMessage = "We crashed durring a commit";
+            forcedState = JobStateInternal.ERROR;
+          }
         }
       }
     } catch (IOException e) {
@@ -374,7 +383,6 @@ public class MRAppMaster extends CompositeService {
         addIfService(cpHist);
       }
     } else {
-      committer = createOutputCommitter(conf);
 
       dispatcher = createDispatcher();
       addIfService(dispatcher);
@@ -454,6 +462,38 @@ public class MRAppMaster extends CompositeService {
     return new AsyncDispatcher();
   }
 
+  private boolean isCommitJobRepeatable() throws IOException {
+    boolean isRepeatable = false;
+    Configuration conf = getConfig();
+    if (committer != null) {
+      final JobContext jobContext = getJobContextFromConf(conf);
+
+      isRepeatable = callWithJobClassLoader(conf,
+          new ExceptionAction<Boolean>() {
+            public Boolean call(Configuration conf) throws IOException {
+              return committer.isCommitJobRepeatable(jobContext);
+            }
+          });
+    }
+    return isRepeatable;
+  }
+
+  private JobContext getJobContextFromConf(Configuration conf) {
+    if (newApiCommitter) {
+      return new JobContextImpl(conf, TypeConverter.fromYarn(getJobId()));
+    } else {
+      return new org.apache.hadoop.mapred.JobContextImpl(
+          new JobConf(conf), TypeConverter.fromYarn(getJobId()));
+    }
+  }
+
+  private void cleanupInterruptedCommit(Configuration conf,
+      FileSystem fs, Path startCommitFile) throws IOException {
+    LOG.info("Delete startJobCommitFile in case commit is not finished as " +
+        "successful or failed.");
+    fs.delete(startCommitFile, false);
+  }
+
   private OutputCommitter createOutputCommitter(Configuration conf) {
     return callWithJobClassLoader(conf, new Action<OutputCommitter>() {
       public OutputCommitter call(Configuration conf) {
@@ -1131,14 +1171,7 @@ public class MRAppMaster extends CompositeService {
     boolean isSupported = false;
     Configuration conf = getConfig();
     if (committer != null) {
-      final JobContext _jobContext;
-      if (newApiCommitter) {
-         _jobContext = new JobContextImpl(
-            conf, TypeConverter.fromYarn(getJobId()));
-      } else {
-          _jobContext = new org.apache.hadoop.mapred.JobContextImpl(
-                new JobConf(conf), TypeConverter.fromYarn(getJobId()));
-      }
+      final JobContext _jobContext = getJobContextFromConf(conf);
       isSupported = callWithJobClassLoader(conf,
           new ExceptionAction<Boolean>() {
             public Boolean call(Configuration conf) throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f95f416f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
index d56c1e5..b53955f 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
@@ -261,27 +261,38 @@ public class CommitterEventHandler extends AbstractService
       }
     }
 
-    private void touchz(Path p) throws IOException {
-      fs.create(p, false).close();
+    // If job commit is repeatable, then we should allow
+    // startCommitFile/endCommitSuccessFile/endCommitFailureFile to be written
+    // by other AM before.
+    private void touchz(Path p, boolean overwrite) throws IOException {
+      fs.create(p, overwrite).close();
     }
-    
+
     @SuppressWarnings("unchecked")
     protected void handleJobCommit(CommitterJobCommitEvent event) {
+      boolean commitJobIsRepeatable = false;
       try {
-        touchz(startCommitFile);
+        commitJobIsRepeatable = committer.isCommitJobRepeatable(
+            event.getJobContext());
+      } catch (IOException e) {
+        LOG.warn("Exception in committer.isCommitJobRepeatable():", e);
+      }
+
+      try {
+        touchz(startCommitFile, commitJobIsRepeatable);
         jobCommitStarted();
         waitForValidCommitWindow();
         committer.commitJob(event.getJobContext());
-        touchz(endCommitSuccessFile);
+        touchz(endCommitSuccessFile, commitJobIsRepeatable);
         context.getEventHandler().handle(
             new JobCommitCompletedEvent(event.getJobID()));
       } catch (Exception e) {
+        LOG.error("Could not commit job", e);
         try {
-          touchz(endCommitFailureFile);
+          touchz(endCommitFailureFile, commitJobIsRepeatable);
         } catch (Exception e2) {
           LOG.error("could not create failure file.", e2);
         }
-        LOG.error("Could not commit job", e);
         context.getEventHandler().handle(
             new JobCommitFailedEvent(event.getJobID(),
                 StringUtils.stringifyException(e)));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f95f416f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
index 77d06b6..c44bb37 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
@@ -182,7 +182,7 @@ public class FileOutputCommitter extends OutputCommitter {
   throws IOException {
     return getWrapped(context).needsTaskCommit(context, 
getTaskAttemptPath(context));
   }
-  
+
   @Override
   @Deprecated
   public boolean isRecoverySupported() {
@@ -190,6 +190,11 @@ public class FileOutputCommitter extends OutputCommitter {
   }
 
   @Override
+  public boolean isCommitJobRepeatable(JobContext context) throws IOException {
+    return getWrapped(context).isCommitJobRepeatable(context);
+  }
+
+  @Override
   public boolean isRecoverySupported(JobContext context) throws IOException {
     return getWrapped(context).isRecoverySupported(context);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f95f416f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java
index 79df7f8..f774456 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java
@@ -192,7 +192,7 @@ public abstract class OutputCommitter
    * 
    * If task output recovery is supported, job restart can be done more
    * efficiently.
-   * 
+   *
    * @param jobContext
    *          Context of the job whose output is being written.
    * @return <code>true</code> if task output recovery is supported,
@@ -205,6 +205,38 @@ public abstract class OutputCommitter
   }
 
   /**
+   * Returns true if an in-progress job commit can be retried. If the MR AM is
+   * re-run then it will check this value to determine if it can retry an
+   * in-progress commit that was started by a previous version.
+   * Note that in rare scenarios, the previous AM version might still be 
running
+   * at that time, due to system anomalies. Hence if this method returns true
+   * then the retry commit operation should be able to run concurrently with
+   * the previous operation.
+   *
+   * If repeatable job commit is supported, job restart can tolerate previous
+   * AM failures during job commit.
+   *
+   * By default, it is not supported. Extended classes (like:
+   * FileOutputCommitter) should explicitly override it if provide support.
+   *
+   * @param jobContext
+   *          Context of the job whose output is being written.
+   * @return <code>true</code> repeatable job commit is supported,
+   *         <code>false</code> otherwise
+   * @throws IOException
+   */
+  public boolean isCommitJobRepeatable(JobContext jobContext) throws
+      IOException {
+    return false;
+  }
+
+  @Override
+  public boolean isCommitJobRepeatable(org.apache.hadoop.mapreduce.JobContext
+      jobContext) throws IOException {
+    return isCommitJobRepeatable((JobContext) jobContext);
+  }
+
+  /**
    * Recover the task output. 
    * 
    * The retry-count for the job will be passed via the 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f95f416f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java
index cb44f63..53af617 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java
@@ -190,6 +190,32 @@ public abstract class OutputCommitter {
   }
 
   /**
+   * Returns true if an in-progress job commit can be retried. If the MR AM is
+   * re-run then it will check this value to determine if it can retry an
+   * in-progress commit that was started by a previous version.
+   * Note that in rare scenarios, the previous AM version might still be 
running
+   * at that time, due to system anomalies. Hence if this method returns true
+   * then the retry commit operation should be able to run concurrently with
+   * the previous operation.
+   *
+   * If repeatable job commit is supported, job restart can tolerate previous
+   * AM failures during job commit.
+   *
+   * By default, it is not supported. Extended classes (like:
+   * FileOutputCommitter) should explicitly override it if provide support.
+   *
+   * @param jobContext
+   *          Context of the job whose output is being written.
+   * @return <code>true</code> repeatable job commit is supported,
+   *         <code>false</code> otherwise
+   * @throws IOException
+   */
+  public boolean isCommitJobRepeatable(JobContext jobContext)
+      throws IOException {
+    return false;
+  }
+
+  /**
    * Is task output recovery supported for restarting jobs?
    * 
    * If task output recovery is supported, job restart can be done more

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f95f416f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
index 6e5d0a1..a9305dd 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
@@ -38,6 +38,8 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /** An {@link OutputCommitter} that commits files specified 
  * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
  **/
@@ -64,6 +66,12 @@ public class FileOutputCommitter extends OutputCommitter {
   public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION =
       "mapreduce.fileoutputcommitter.algorithm.version";
   public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 1;
+  // Number of attempts when failure happens in commit job
+  public static final String FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS =
+      "mapreduce.fileoutputcommitter.failures.attempts";
+
+  // default value to be 1 to keep consistent with previous behavior
+  public static final int FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT = 1;
   private Path outputPath = null;
   private Path workPath = null;
   private final int algorithmVersion;
@@ -311,12 +319,40 @@ public class FileOutputCommitter extends OutputCommitter {
   }
 
   /**
-   * The job has completed so move all committed tasks to the final output dir.
+   * The job has completed, so do works in commitJobInternal().
+   * Could retry on failure if using algorithm 2.
+   * @param context the job's context
+   */
+  public void commitJob(JobContext context) throws IOException {
+    int maxAttemptsOnFailure = isCommitJobRepeatable(context) ?
+        context.getConfiguration().getInt(FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS,
+            FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT) : 1;
+    int attempt = 0;
+    boolean jobCommitNotFinished = true;
+    while (jobCommitNotFinished) {
+      try {
+        commitJobInternal(context);
+        jobCommitNotFinished = false;
+      } catch (Exception e) {
+        if (++attempt >= maxAttemptsOnFailure) {
+          throw e;
+        } else {
+          LOG.warn("Exception get thrown in job commit, retry (" + attempt +
+              ") time.", e);
+        }
+      }
+    }
+  }
+
+  /**
+   * The job has completed, so do following commit job, include:
+   * Move all committed tasks to the final output dir (algorithm 1 only).
    * Delete the temporary directory, including all of the work directories.
    * Create a _SUCCESS file to make it as successful.
    * @param context the job's context
    */
-  public void commitJob(JobContext context) throws IOException {
+  @VisibleForTesting
+  protected void commitJobInternal(JobContext context) throws IOException {
     if (hasOutputPath()) {
       Path finalOutput = getOutputPath();
       FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());
@@ -331,9 +367,17 @@ public class FileOutputCommitter extends OutputCommitter {
       cleanupJob(context);
       // True if the job requires output.dir marked on successful job.
       // Note that by default it is set to true.
-      if 
(context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) 
{
+      if (context.getConfiguration().getBoolean(
+          SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) {
         Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME);
-        fs.create(markerPath).close();
+        // If job commit is repeatable and previous/another AM could write
+        // mark file already, we need to set overwritten to be true explicitly
+        // in case other FS implementations don't overwritten by default.
+        if (isCommitJobRepeatable(context)) {
+          fs.create(markerPath, true).close();
+        } else {
+          fs.create(markerPath).close();
+        }
       }
     } else {
       LOG.warn("Output Path is null in commitJob()");
@@ -412,7 +456,16 @@ public class FileOutputCommitter extends OutputCommitter {
       Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
       FileSystem fs = pendingJobAttemptsPath
           .getFileSystem(context.getConfiguration());
-      fs.delete(pendingJobAttemptsPath, true);
+      // if job allow repeatable commit and pendingJobAttemptsPath could be
+      // deleted by previous AM, we should tolerate FileNotFoundException in
+      // this case.
+      try {
+        fs.delete(pendingJobAttemptsPath, true);
+      } catch (FileNotFoundException e) {
+        if (!isCommitJobRepeatable(context)) {
+          throw e;
+        }
+      }
     } else {
       LOG.warn("Output Path is null in cleanupJob()");
     }
@@ -548,7 +601,12 @@ public class FileOutputCommitter extends OutputCommitter {
   public boolean isRecoverySupported() {
     return true;
   }
-  
+
+  @Override
+  public boolean isCommitJobRepeatable(JobContext context) throws IOException {
+    return algorithmVersion == 2;
+  }
+
   @Override
   public void recoverTask(TaskAttemptContext context)
       throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f95f416f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
index 3207a71..e15f7ab 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.net.URI;
 
 import junit.framework.TestCase;
+import org.junit.Assert;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -202,6 +203,112 @@ public class TestFileOutputCommitter extends TestCase {
     assert(dataFileFound && indexFileFound);
   }
 
+  public void testCommitterWithFailureV1() throws Exception {
+    testCommitterWithFailureInternal(1, 1);
+    testCommitterWithFailureInternal(1, 2);
+  }
+
+  public void testCommitterWithFailureV2() throws Exception {
+    testCommitterWithFailureInternal(2, 1);
+    testCommitterWithFailureInternal(2, 2);
+  }
+
+  private void testCommitterWithFailureInternal(int version, int maxAttempts) 
throws Exception {
+    JobConf conf = new JobConf();
+    FileOutputFormat.setOutputPath(conf, outDir);
+    conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+    conf.setInt(org.apache.hadoop.mapreduce.lib.output.
+        FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
+    conf.setInt(org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.
+        FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS, maxAttempts);
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+    FileOutputCommitter committer = new CommitterWithFailedThenSucceed();
+
+    // setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+
+    // write output
+    TextOutputFormat theOutputFormat = new TextOutputFormat();
+    RecordWriter theRecordWriter =
+        theOutputFormat.getRecordWriter(null, conf, partFile, null);
+    writeOutput(theRecordWriter, tContext);
+
+    // do commit
+    if(committer.needsTaskCommit(tContext)) {
+      committer.commitTask(tContext);
+    }
+
+    try {
+      committer.commitJob(jContext);
+      // (1,1), (1,2), (2,1) shouldn't reach to here.
+      if (version == 1 || maxAttempts <= 1) {
+        Assert.fail("Commit successful: wrong behavior for version 1.");
+      }
+    } catch (IOException e) {
+      // (2,2) shouldn't reach to here.
+      if (version == 2 && maxAttempts > 2) {
+        Assert.fail("Commit failed: wrong behavior for version 2.");
+      }
+    }
+
+    FileUtil.fullyDelete(new File(outDir.toString()));
+  }
+
+  public void testCommitterWithDuplicatedCommitV1() throws Exception {
+    testCommitterWithDuplicatedCommitInternal(1);
+  }
+
+  public void testCommitterWithDuplicatedCommitV2() throws Exception {
+    testCommitterWithDuplicatedCommitInternal(2);
+  }
+
+  private void testCommitterWithDuplicatedCommitInternal(int version) throws
+      Exception {
+    JobConf conf = new JobConf();
+    FileOutputFormat.setOutputPath(conf, outDir);
+    conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+    conf.setInt(org.apache.hadoop.mapreduce.lib.output.
+        FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+    FileOutputCommitter committer = new FileOutputCommitter();
+
+    // setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+
+    // write output
+    TextOutputFormat theOutputFormat = new TextOutputFormat();
+    RecordWriter theRecordWriter =
+        theOutputFormat.getRecordWriter(null, conf, partFile, null);
+    writeOutput(theRecordWriter, tContext);
+
+    // do commit
+    if(committer.needsTaskCommit(tContext)) {
+      committer.commitTask(tContext);
+    }
+    committer.commitJob(jContext);
+
+    // validate output
+    validateContent(outDir);
+
+    // commit again
+    try {
+      committer.commitJob(jContext);
+      if (version == 1) {
+        Assert.fail("Duplicate commit successful: wrong behavior " +
+            "for version 1.");
+      }
+    } catch (IOException e) {
+      if (version == 2) {
+        Assert.fail("Duplicate commit failed: wrong behavior for version 2.");
+      }
+    }
+    FileUtil.fullyDelete(new File(outDir.toString()));
+  }
+
   private void testCommitterInternal(int version) throws Exception {
     JobConf conf = new JobConf();
     FileOutputFormat.setOutputPath(conf, outDir);
@@ -451,4 +558,48 @@ public class TestFileOutputCommitter extends TestCase {
     return contents;
   }
 
+  /**
+   * The class provides a overrided implementation of commitJobInternal which
+   * causes the commit failed for the first time then succeed.
+   */
+  public static class CommitterWithFailedThenSucceed extends
+      FileOutputCommitter {
+    boolean firstTimeFail = true;
+
+    public CommitterWithFailedThenSucceed() throws IOException {
+      super();
+    }
+
+    @Override
+    public void commitJob(JobContext context) throws IOException {
+      JobConf conf = context.getJobConf();
+      org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter wrapped =
+          new CommitterFailedFirst(FileOutputFormat.getOutputPath(conf),
+              context);
+      wrapped.commitJob(context);
+    }
+  }
+
+  public static class CommitterFailedFirst extends
+      org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter {
+    boolean firstTimeFail = true;
+
+    public CommitterFailedFirst(Path outputPath,
+        JobContext context) throws IOException {
+      super(outputPath, context);
+    }
+
+    @Override
+    protected void commitJobInternal(org.apache.hadoop.mapreduce.JobContext
+        context) throws IOException {
+      super.commitJobInternal(context);
+      if (firstTimeFail) {
+        firstTimeFail = false;
+        throw new IOException();
+      } else {
+        // succeed then, nothing to do
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f95f416f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
index 0d4ab98..bf77e55 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.lib.output;
 
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.util.concurrent.Callable;
@@ -28,6 +29,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import junit.framework.TestCase;
+import org.junit.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -284,6 +286,174 @@ public class TestFileOutputCommitter extends TestCase {
     testCommitterInternal(2);
   }
 
+  public void testCommitterWithDuplicatedCommitV1() throws Exception {
+    testCommitterWithDuplicatedCommitInternal(1);
+  }
+
+  public void testCommitterWithDuplicatedCommitV2() throws Exception {
+    testCommitterWithDuplicatedCommitInternal(2);
+  }
+
+  private void testCommitterWithDuplicatedCommitInternal(int version) throws
+      Exception {
+    Job job = Job.getInstance();
+    FileOutputFormat.setOutputPath(job, outDir);
+    Configuration conf = job.getConfiguration();
+    conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
+    conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+        version);
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+    FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
+
+    // setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+
+    // write output
+    TextOutputFormat theOutputFormat = new TextOutputFormat();
+    RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
+    writeOutput(theRecordWriter, tContext);
+
+    // do commit
+    committer.commitTask(tContext);
+    committer.commitJob(jContext);
+
+    // validate output
+    validateContent(outDir);
+
+    // commit job again on a successful commit job.
+    try {
+      committer.commitJob(jContext);
+      if (version == 1) {
+        Assert.fail("Duplicate commit success: wrong behavior for version 1.");
+      }
+    } catch (IOException e) {
+      if (version == 2) {
+        Assert.fail("Duplicate commit failed: wrong behavior for version 2.");
+      }
+    }
+    FileUtil.fullyDelete(new File(outDir.toString()));
+  }
+
+  public void testCommitterWithFailureV1() throws Exception {
+    testCommitterWithFailureInternal(1, 1);
+    testCommitterWithFailureInternal(1, 2);
+  }
+
+  public void testCommitterWithFailureV2() throws Exception {
+    testCommitterWithFailureInternal(2, 1);
+    testCommitterWithFailureInternal(2, 2);
+  }
+
+  private void testCommitterWithFailureInternal(int version, int maxAttempts)
+      throws Exception {
+    Job job = Job.getInstance();
+    FileOutputFormat.setOutputPath(job, outDir);
+    Configuration conf = job.getConfiguration();
+    conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
+    conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+        version);
+    conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS,
+        maxAttempts);
+
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+    FileOutputCommitter committer = new CommitterWithFailedThenSucceed(outDir,
+        tContext);
+
+    // setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+
+    // write output
+    TextOutputFormat theOutputFormat = new TextOutputFormat();
+    RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
+    writeOutput(theRecordWriter, tContext);
+
+    // do commit
+    committer.commitTask(tContext);
+
+    try {
+      committer.commitJob(jContext);
+      // (1,1), (1,2), (2,1) shouldn't reach to here.
+      if (version == 1 || maxAttempts <= 1) {
+        Assert.fail("Commit successful: wrong behavior for version 1.");
+      }
+    } catch (IOException e) {
+      // (2,2) shouldn't reach to here.
+      if (version == 2 && maxAttempts > 2) {
+        Assert.fail("Commit failed: wrong behavior for version 2.");
+      }
+    }
+
+    FileUtil.fullyDelete(new File(outDir.toString()));
+  }
+
+  public void testCommitterRepeatableV1() throws Exception {
+    testCommitterRetryInternal(1);
+  }
+
+  public void testCommitterRepeatableV2() throws Exception {
+    testCommitterRetryInternal(2);
+  }
+
+  // retry committer for 2 times.
+  private void testCommitterRetryInternal(int version)
+      throws Exception {
+    Job job = Job.getInstance();
+    FileOutputFormat.setOutputPath(job, outDir);
+    Configuration conf = job.getConfiguration();
+    conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
+    conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+        version);
+    // only attempt for 1 time.
+    conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS,
+        1);
+
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+    FileOutputCommitter committer = new CommitterWithFailedThenSucceed(outDir,
+        tContext);
+
+    // setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+
+    // write output
+    TextOutputFormat theOutputFormat = new TextOutputFormat();
+    RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
+    writeOutput(theRecordWriter, tContext);
+
+    // do commit
+    committer.commitTask(tContext);
+
+    try {
+      committer.commitJob(jContext);
+      Assert.fail("Commit successful: wrong behavior for the first time " +
+          "commit.");
+    } catch (IOException e) {
+      // commit again.
+      try {
+        committer.commitJob(jContext);
+        // version 1 shouldn't reach to here.
+        if (version == 1) {
+          Assert.fail("Commit successful after retry: wrong behavior for " +
+              "version 1.");
+        }
+      } catch (FileNotFoundException ex) {
+        if (version == 2) {
+          Assert.fail("Commit failed after retry: wrong behavior for" +
+              " version 2.");
+        }
+        assertTrue(ex.getMessage().contains(committer.getJobAttemptPath(
+            jContext).toString() + " does not exist"));
+      }
+    }
+
+    FileUtil.fullyDelete(new File(outDir.toString()));
+  }
+
   private void testMapFileOutputCommitterInternal(int version)
       throws Exception {
     Job job = Job.getInstance();
@@ -292,7 +462,7 @@ public class TestFileOutputCommitter extends TestCase {
     conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
     conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
         version);
-    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());    
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
     TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
     FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
 
@@ -575,4 +745,29 @@ public class TestFileOutputCommitter extends TestCase {
     return contents;
   }
 
+  /**
+   * The class provides a overrided implementation of commitJobInternal which
+   * causes the commit failed for the first time then succeed.
+   */
+  public static class CommitterWithFailedThenSucceed extends
+      FileOutputCommitter {
+    boolean firstTimeFail = true;
+
+    public CommitterWithFailedThenSucceed(Path outputPath,
+        JobContext context) throws IOException {
+      super(outputPath, context);
+    }
+
+    @Override
+    protected void commitJobInternal(JobContext context) throws IOException {
+      super.commitJobInternal(context);
+      if (firstTimeFail) {
+        firstTimeFail = false;
+        throw new IOException();
+      } else {
+        // succeed then, nothing to do
+      }
+    }
+  }
+
 }

Reply via email to