This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6a3433b  HADOOP-16357. TeraSort Job failing on S3 
DirectoryStagingCommitter: destination path exists.
6a3433b is described below

commit 6a3433bffdbdefc5aa66705085bcf6fa089721b2
Author: Steve Loughran <[email protected]>
AuthorDate: Thu Jul 11 18:15:20 2019 +0100

    HADOOP-16357. TeraSort Job failing on S3 DirectoryStagingCommitter: 
destination path exists.
    
    Contributed by Steve Loughran.
    
    This patch
    
    * changes the default for the staging committer to append, as we get for 
the classic FileOutputFormat committer
    * adds a check for the dest path being a file not a dir
    * adds tests for this
    * Changes AbstractCommitTerasortIT. to not use the simple parser, so fails 
if the file is present.
    
    Change-Id: Id53742958ed1cf321ff96c9063505d64f3254f53
---
 .../src/main/resources/core-default.xml            |  2 +-
 .../hadoop/fs/s3a/commit/CommitConstants.java      |  2 +-
 .../commit/staging/DirectoryStagingCommitter.java  | 34 ++++++--
 .../fs/s3a/commit/staging/StagingCommitter.java    |  7 +-
 .../site/markdown/tools/hadoop-aws/committers.md   | 11 ++-
 .../apache/hadoop/fs/s3a/MockS3AFileSystem.java    | 11 ++-
 .../hadoop/fs/s3a/commit/AbstractCommitITest.java  | 89 +++++++++------------
 .../fs/s3a/commit/AbstractITCommitMRJob.java       |  4 +-
 .../fs/s3a/commit/staging/StagingTestBase.java     | 34 +++++++-
 .../s3a/commit/staging/TestStagingCommitter.java   |  4 +-
 .../TestStagingDirectoryOutputCommitter.java       | 90 ++++++++++++++++++----
 .../staging/TestStagingPartitionedTaskCommit.java  | 38 +--------
 .../integration/ITestDirectoryCommitProtocol.java  | 34 ++++++++
 .../commit/terasort/AbstractCommitTerasortIT.java  | 23 ++++--
 14 files changed, 250 insertions(+), 133 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml 
b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 5ae60d7..c9a3d51 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1747,7 +1747,7 @@
 
 <property>
   <name>fs.s3a.committer.staging.conflict-mode</name>
-  <value>fail</value>
+  <value>append</value>
   <description>
     Staging committer conflict resolution policy.
     Supported: "fail", "append", "replace".
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
index 03cfcba..877433b 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
@@ -198,7 +198,7 @@ public final class CommitConstants {
   public static final String CONFLICT_MODE_REPLACE = "replace";
 
   /** Default conflict mode: {@value}. */
-  public static final String DEFAULT_CONFLICT_MODE = CONFLICT_MODE_FAIL;
+  public static final String DEFAULT_CONFLICT_MODE = CONFLICT_MODE_APPEND;
 
   /**
    * Number of threads in committers for parallel operations on files
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java
index 23bb06b..32642c9 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.s3a.commit.staging;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.List;
 
@@ -25,8 +26,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathExistsException;
+import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
 import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -65,11 +69,31 @@ public class DirectoryStagingCommitter extends 
StagingCommitter {
     super.setupJob(context);
     Path outputPath = getOutputPath();
     FileSystem fs = getDestFS();
-    if (getConflictResolutionMode(context, fs.getConf())
-        == ConflictResolution.FAIL
-        && fs.exists(outputPath)) {
-      throw failDestinationExists(outputPath,
-          "Setting job as " + getRole());
+    ConflictResolution conflictResolution = getConflictResolutionMode(
+        context, fs.getConf());
+    LOG.info("Conflict Resolution mode is {}", conflictResolution);
+    try {
+      final FileStatus status = fs.getFileStatus(outputPath);
+
+      // if it is not a directory, fail fast for all conflict options.
+      if (!status.isDirectory()) {
+        throw new PathExistsException(outputPath.toString(),
+            "output path is not a directory: "
+                + InternalCommitterConstants.E_DEST_EXISTS);
+      }
+      switch(conflictResolution) {
+      case FAIL:
+        throw failDestinationExists(outputPath,
+            "Setting job as " + getRole());
+      case APPEND:
+      case REPLACE:
+        LOG.debug("Destination directory exists; conflict policy permits 
this");
+      }
+    } catch (FileNotFoundException ignored) {
+      // there is no destination path, hence, no conflict.
+      // make the parent directory, which also triggers a recursive directory
+      // creation operation
+      fs.mkdirs(outputPath);
     }
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
index 518d789..7ec4478 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
@@ -842,7 +842,7 @@ public class StagingCommitter extends AbstractS3ACommitter {
       Configuration fsConf) {
     if (conflictResolution == null) {
       this.conflictResolution = ConflictResolution.valueOf(
-          getConfictModeOption(context, fsConf));
+          getConfictModeOption(context, fsConf, DEFAULT_CONFLICT_MODE));
     }
     return conflictResolution;
   }
@@ -889,14 +889,15 @@ public class StagingCommitter extends 
AbstractS3ACommitter {
    * Get the conflict mode option string.
    * @param context context with the config
    * @param fsConf filesystem config
+   * @param defVal default value.
    * @return the trimmed configuration option, upper case.
    */
   public static String getConfictModeOption(JobContext context,
-      Configuration fsConf) {
+      Configuration fsConf, String defVal) {
     return getConfigurationOption(context,
         fsConf,
         FS_S3A_COMMITTER_STAGING_CONFLICT_MODE,
-        DEFAULT_CONFLICT_MODE).toUpperCase(Locale.ENGLISH);
+        defVal).toUpperCase(Locale.ENGLISH);
   }
 
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md 
b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
index ef9c999..ac03bdc 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
@@ -173,7 +173,7 @@ This then is the problem which the S3A committers address:
 *How to safely and reliably commit work to Amazon S3 or compatible object 
store*
 
 
-## Meet the S3A Commmitters
+## Meet the S3A Committers
 
 Since Hadoop 3.1, the S3A FileSystem has been accompanied by classes
 designed to integrate with the Hadoop and Spark job commit protocols, classes
@@ -226,8 +226,8 @@ it is committed through the standard "v1" commit algorithm.
 When the Job is committed, the Job Manager reads the lists of pending writes 
from its
 HDFS Job destination directory and completes those uploads.
 
-Cancelling a task is straightforward: the local directory is deleted with
-its staged data. Cancelling a job is achieved by reading in the lists of
+Canceling a task is straightforward: the local directory is deleted with
+its staged data. Canceling a job is achieved by reading in the lists of
 pending writes from the HDFS job attempt directory, and aborting those
 uploads. For extra safety, all outstanding multipart writes to the destination 
directory
 are aborted.
@@ -537,9 +537,8 @@ Conflict management is left to the execution engine itself.
 |--------|-------|-----------|-------------|---------|---------|
 | `mapreduce.fileoutputcommitter.marksuccessfuljobs` | X | X | X | Write a 
`_SUCCESS` file  at the end of each job | `true` |
 | `fs.s3a.committer.threads` | X | X | X | Number of threads in committers for 
parallel operations on files. | 8 |
-| `fs.s3a.committer.staging.conflict-mode` |  | X | X | Conflict resolution: 
`fail`, `abort` or `overwrite`| `fail` |
+| `fs.s3a.committer.staging.conflict-mode` |  | X | X | Conflict resolution: 
`fail`, `append` or `replace`| `append` |
 | `fs.s3a.committer.staging.unique-filenames` |  | X | X | Generate unique 
filenames | `true` |
-
 | `fs.s3a.committer.magic.enabled` | X |  | | Enable "magic committer" support 
in the filesystem | `false` |
 
 
@@ -607,7 +606,7 @@ Conflict management is left to the execution engine itself.
 
 <property>
   <name>fs.s3a.committer.staging.conflict-mode</name>
-  <value>fail</value>
+  <value>append</value>
   <description>
     Staging committer conflict resolution policy.
     Supported: "fail", "append", "replace".
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java
index 99f1a07..d421118 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java
@@ -42,6 +42,8 @@ import 
org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase;
 import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
 import org.apache.hadoop.util.Progressable;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * Relays FS calls to the mocked FS, allows for some extra logging with
  * stack traces to be included, stubbing out other methods
@@ -241,6 +243,12 @@ public class MockS3AFileSystem extends S3AFileSystem {
   }
 
   @Override
+  public boolean mkdirs(Path f) throws IOException {
+    event("mkdirs(%s)", f);
+    return mock.mkdirs(f);
+  }
+
+  @Override
   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
     event("mkdirs(%s)", f);
     return mock.mkdirs(f, permission);
@@ -249,7 +257,8 @@ public class MockS3AFileSystem extends S3AFileSystem {
   @Override
   public FileStatus getFileStatus(Path f) throws IOException {
     event("getFileStatus(%s)", f);
-    return mock.getFileStatus(f);
+    return checkNotNull(mock.getFileStatus(f),
+        "Mock getFileStatus(%s) returned null", f);
   }
 
   @Override
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
index e8645b8..6023daa 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
@@ -18,10 +18,8 @@
 
 package org.apache.hadoop.fs.s3a.commit;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -53,7 +51,6 @@ import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.MultipartTestUtils.listMultipartUploads;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
-import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
 
 /**
@@ -109,6 +106,13 @@ public abstract class AbstractCommitITest extends 
AbstractS3ATestBase {
   @Override
   protected Configuration createConfiguration() {
     Configuration conf = super.createConfiguration();
+    String bucketName = getTestBucketName(conf);
+    removeBucketOverrides(bucketName, conf,
+        MAGIC_COMMITTER_ENABLED,
+        S3A_COMMITTER_FACTORY_KEY,
+        FS_S3A_COMMITTER_NAME,
+        FS_S3A_COMMITTER_STAGING_CONFLICT_MODE);
+
     conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
     conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
     conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
@@ -355,25 +359,7 @@ public abstract class AbstractCommitITest extends 
AbstractS3ATestBase {
    * @throws IOException IO Failure
    */
   protected SuccessData verifySuccessMarker(Path dir) throws IOException {
-    assertPathExists("Success marker",
-        new Path(dir, _SUCCESS));
-    SuccessData successData = loadSuccessMarker(dir);
-    log().info("Success data {}", successData.toString());
-    log().info("Metrics\n{}",
-        successData.dumpMetrics("  ", " = ", "\n"));
-    log().info("Diagnostics\n{}",
-        successData.dumpDiagnostics("  ", " = ", "\n"));
-    return successData;
-  }
-
-  /**
-   * Load the success marker and return the data inside it.
-   * @param dir directory containing the marker
-   * @return the loaded data
-   * @throws IOException on any failure to load or validate the data
-   */
-  protected SuccessData loadSuccessMarker(Path dir) throws IOException {
-    return SuccessData.load(getFileSystem(), new Path(dir, _SUCCESS));
+    return validateSuccessFile(dir, "", getFileSystem(), "query");
   }
 
   /**
@@ -447,32 +433,18 @@ public abstract class AbstractCommitITest extends 
AbstractS3ATestBase {
   /**
    * Load in the success data marker: this guarantees that an S3A
    * committer was used.
-   * @param fs filesystem
    * @param outputPath path of job
-   * @param committerName name of committer to match
+   * @param committerName name of committer to match, or ""
+   * @param fs filesystem
+   * @param origin origin (e.g. "teragen" for messages)
    * @return the success data
    * @throws IOException IO failure
    */
-  public static SuccessData validateSuccessFile(final S3AFileSystem fs,
-      final Path outputPath, final String committerName) throws IOException {
-    SuccessData successData = null;
-    try {
-      successData = loadSuccessFile(fs, outputPath);
-    } catch (FileNotFoundException e) {
-      // either the output path is missing or, if its the success file,
-      // somehow the relevant committer wasn't picked up.
-      String dest = outputPath.toString();
-      LOG.error("No _SUCCESS file found under {}", dest);
-      List<String> files = new ArrayList<>();
-      applyLocatedFiles(fs.listFiles(outputPath, true),
-          (status) -> {
-            files.add(status.getPath().toString());
-            LOG.error("{} {}", status.getPath(), status.getLen());
-          });
-      throw new AssertionError("No _SUCCESS file in " + dest
-          + "; found : " + files.stream().collect(Collectors.joining("\n")),
-          e);
-    }
+  public static SuccessData validateSuccessFile(final Path outputPath,
+      final String committerName,
+      final S3AFileSystem fs,
+      final String origin) throws IOException {
+    SuccessData successData = loadSuccessFile(fs, outputPath, origin);
     String commitDetails = successData.toString();
     LOG.info("Committer name " + committerName + "\n{}",
         commitDetails);
@@ -480,8 +452,10 @@ public abstract class AbstractCommitITest extends 
AbstractS3ATestBase {
         successData.dumpMetrics("  ", " = ", "\n"));
     LOG.info("Diagnostics\n{}",
         successData.dumpDiagnostics("  ", " = ", "\n"));
-    assertEquals("Wrong committer in " + commitDetails,
-        committerName, successData.getCommitter());
+    if (!committerName.isEmpty()) {
+      assertEquals("Wrong committer in " + commitDetails,
+          committerName, successData.getCommitter());
+    }
     return successData;
   }
 
@@ -489,16 +463,29 @@ public abstract class AbstractCommitITest extends 
AbstractS3ATestBase {
    * Load a success file; fail if the file is empty/nonexistent.
    * @param fs filesystem
    * @param outputPath directory containing the success file.
+   * @param origin origin of the file
    * @return the loaded file.
    * @throws IOException failure to find/load the file
-   * @throws AssertionError file is 0-bytes long
+   * @throws AssertionError file is 0-bytes long,
    */
   public static SuccessData loadSuccessFile(final S3AFileSystem fs,
-      final Path outputPath) throws IOException {
+      final Path outputPath, final String origin) throws IOException {
+    ContractTestUtils.assertPathExists(fs,
+        "Output directory " + outputPath
+            + " from " + origin
+            + " not found: Job may not have executed",
+        outputPath);
     Path success = new Path(outputPath, _SUCCESS);
-    ContractTestUtils.assertIsFile(fs, success);
-    FileStatus status = fs.getFileStatus(success);
-    assertTrue("0 byte success file - not a s3guard committer " + success,
+    FileStatus status = ContractTestUtils.verifyPathExists(fs,
+        "job completion marker " + success
+            + " from " + origin
+            + " not found: Job may have failed",
+        success);
+    assertTrue("_SUCCESS outout from " + origin + " is not a file " + status,
+        status.isFile());
+    assertTrue("0 byte success file "
+            + success + " from " + origin
+            + "; a s3guard committer was not used",
         status.getLen() > 0);
     return SuccessData.load(fs, success);
   }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
index 1fb3d89..c5e0265 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
@@ -147,8 +147,8 @@ public abstract class AbstractITCommitMRJob extends 
AbstractYarnClusterITest {
     }
     Collections.sort(actualFiles);
 
-    SuccessData successData = validateSuccessFile(fs, outputPath,
-        committerName());
+    SuccessData successData = validateSuccessFile(outputPath, committerName(),
+        fs, "MR job");
     List<String> successFiles = successData.getFilenames();
     String commitData = successData.toString();
     assertTrue("No filenames in " + commitData,
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java
index 17ad7f6..fd585d0 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.s3a.commit.staging;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.Serializable;
@@ -55,6 +56,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.Path;
@@ -196,9 +198,34 @@ public class StagingTestBase {
     when(mockS3.exists(path)).thenReturn(true);
   }
 
+  public static void pathIsDirectory(FileSystem mockS3, Path path)
+      throws IOException {
+    hasFileStatus(mockS3, path,
+        new FileStatus(0, true, 0, 0, 0, path));
+  }
+
+  public static void pathIsFile(FileSystem mockS3, Path path)
+      throws IOException {
+    pathExists(mockS3, path);
+    hasFileStatus(mockS3, path,
+        new FileStatus(0, false, 0, 0, 0, path));
+  }
+
   public static void pathDoesNotExist(FileSystem mockS3, Path path)
       throws IOException {
     when(mockS3.exists(path)).thenReturn(false);
+    when(mockS3.getFileStatus(path)).thenThrow(
+        new FileNotFoundException("mock fnfe of " + path));
+  }
+
+  public static void hasFileStatus(FileSystem mockS3,
+      Path path, FileStatus status) throws IOException {
+    when(mockS3.getFileStatus(path)).thenReturn(status);
+  }
+
+  public static void mkdirsHasOutcome(FileSystem mockS3,
+      Path path, boolean outcome) throws IOException {
+    when(mockS3.mkdirs(path)).thenReturn(outcome);
   }
 
   public static void canDelete(FileSystem mockS3, String... children)
@@ -221,7 +248,12 @@ public class StagingTestBase {
 
   public static void verifyExistenceChecked(FileSystem mockS3, Path path)
       throws IOException {
-    verify(mockS3).exists(path);
+    verify(mockS3).getFileStatus(path);
+  }
+
+  public static void verifyMkdirsInvoked(FileSystem mockS3, Path path)
+      throws IOException {
+    verify(mockS3).mkdirs(path);
   }
 
   /**
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java
index 2f7e8e0..505a99b 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java
@@ -117,10 +117,10 @@ public class TestStagingCommitter extends 
StagingTestBase.MiniDFSTest {
 
   /**
    * Test array for parameterized test runs: how many threads and
-   * how many files to use.
+   * whether or not filenames are unique.
    * @return a list of parameter tuples.
    */
-  @Parameterized.Parameters
+  @Parameterized.Parameters(name="threads-{0}-unique-{1}")
   public static Collection<Object[]> params() {
     return Arrays.asList(new Object[][] {
         {0, false},
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java
index b511293..994ecef 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java
@@ -18,9 +18,15 @@
 
 package org.apache.hadoop.fs.s3a.commit.staging;
 
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.PathExistsException;
 import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
@@ -34,6 +40,9 @@ import static org.mockito.Mockito.*;
 public class TestStagingDirectoryOutputCommitter
     extends StagingTestBase.JobCommitterTest<DirectoryStagingCommitter> {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestStagingDirectoryOutputCommitter.class);
+
   @Override
   DirectoryStagingCommitter newJobCommitter() throws Exception {
     return new DirectoryStagingCommitter(outputPath,
@@ -53,8 +62,10 @@ public class TestStagingDirectoryOutputCommitter
   public void testDefaultConflictResolution() throws Exception {
     getJob().getConfiguration().unset(
         FS_S3A_COMMITTER_STAGING_CONFLICT_MODE);
-    verifyFailureConflictOutcome();
+    pathIsDirectory(getMockS3A(), outputPath);
+    verifyJobSetupAndCommit();
   }
+
   @Test
   public void testFailConflictResolution() throws Exception {
     getJob().getConfiguration().set(
@@ -63,8 +74,7 @@ public class TestStagingDirectoryOutputCommitter
   }
 
   protected void verifyFailureConflictOutcome() throws Exception {
-    FileSystem mockS3 = getMockS3A();
-    pathExists(mockS3, outputPath);
+    pathIsDirectory(getMockS3A(), outputPath);
     final DirectoryStagingCommitter committer = newJobCommitter();
 
     // this should fail
@@ -76,32 +86,36 @@ public class TestStagingDirectoryOutputCommitter
     // but there are no checks in job commit (HADOOP-15469)
     committer.commitJob(getJob());
 
-    reset(mockS3);
-    pathDoesNotExist(mockS3, outputPath);
+    reset((FileSystem) getMockS3A());
+    pathDoesNotExist(getMockS3A(), outputPath);
 
     committer.setupJob(getJob());
-    verifyExistenceChecked(mockS3, outputPath);
-    verifyNoMoreInteractions(mockS3);
+    verifyExistenceChecked(getMockS3A(), outputPath);
+    verifyMkdirsInvoked(getMockS3A(), outputPath);
+    verifyNoMoreInteractions((FileSystem) getMockS3A());
 
-    reset(mockS3);
-    pathDoesNotExist(mockS3, outputPath);
+    reset((FileSystem) getMockS3A());
+    pathDoesNotExist(getMockS3A(), outputPath);
     committer.commitJob(getJob());
-    verifyCompletion(mockS3);
+    verifyCompletion(getMockS3A());
   }
 
   @Test
   public void testAppendConflictResolution() throws Exception {
-    FileSystem mockS3 = getMockS3A();
-
-    pathExists(mockS3, outputPath);
 
     getJob().getConfiguration().set(
         FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND);
+    FileSystem mockS3 = getMockS3A();
+    pathIsDirectory(mockS3, outputPath);
+    verifyJobSetupAndCommit();
+  }
 
+  protected void verifyJobSetupAndCommit()
+      throws Exception {
     final DirectoryStagingCommitter committer = newJobCommitter();
 
     committer.setupJob(getJob());
-    verifyNoMoreInteractions(mockS3);
+    FileSystem mockS3 = getMockS3A();
 
     Mockito.reset(mockS3);
     pathExists(mockS3, outputPath);
@@ -114,7 +128,7 @@ public class TestStagingDirectoryOutputCommitter
   public void testReplaceConflictResolution() throws Exception {
     FileSystem mockS3 = getMockS3A();
 
-    pathExists(mockS3, outputPath);
+    pathIsDirectory(mockS3, outputPath);
 
     getJob().getConfiguration().set(
         FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_REPLACE);
@@ -122,7 +136,6 @@ public class TestStagingDirectoryOutputCommitter
     final DirectoryStagingCommitter committer = newJobCommitter();
 
     committer.setupJob(getJob());
-    verifyNoMoreInteractions(mockS3);
 
     Mockito.reset(mockS3);
     pathExists(mockS3, outputPath);
@@ -133,4 +146,49 @@ public class TestStagingDirectoryOutputCommitter
     verifyCompletion(mockS3);
   }
 
+  @Test
+  public void testReplaceConflictFailsIfDestIsFile() throws Exception {
+    pathIsFile(getMockS3A(), outputPath);
+
+    getJob().getConfiguration().set(
+        FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_REPLACE);
+
+    intercept(PathExistsException.class,
+        InternalCommitterConstants.E_DEST_EXISTS,
+        "Expected a PathExistsException as the destination"
+            + " was a file",
+        () -> {
+          newJobCommitter().setupJob(getJob());
+        });
+  }
+
+  @Test
+  public void testAppendConflictFailsIfDestIsFile() throws Exception {
+    pathIsFile(getMockS3A(), outputPath);
+
+    getJob().getConfiguration().set(
+        FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND);
+
+    intercept(PathExistsException.class,
+        InternalCommitterConstants.E_DEST_EXISTS,
+        "Expected a PathExistsException as a the destination"
+            + " is a file",
+        () -> {
+          newJobCommitter().setupJob(getJob());
+        });
+  }
+
+  @Test
+  public void testValidateDefaultConflictMode() throws Throwable {
+    Configuration baseConf = new Configuration(true);
+    String[] sources = baseConf.getPropertySources(
+        FS_S3A_COMMITTER_STAGING_CONFLICT_MODE);
+    String sourceStr = Arrays.stream(sources)
+        .collect(Collectors.joining(","));
+    LOG.info("source of conflict mode {}", sourceStr);
+    String baseConfVal = baseConf
+        .getTrimmed(FS_S3A_COMMITTER_STAGING_CONFLICT_MODE);
+    assertEquals("conflict mode in core config from " + sourceStr,
+        CONFLICT_MODE_APPEND, baseConfVal);
+  }
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedTaskCommit.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedTaskCommit.java
index 2409b68..8116b79 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedTaskCommit.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedTaskCommit.java
@@ -33,7 +33,6 @@ import org.junit.Test;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathExistsException;
-import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
 import org.apache.hadoop.mapreduce.JobContext;
 
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
@@ -80,48 +79,13 @@ public class TestStagingPartitionedTaskCommit
 
   @Test
   public void testDefault() throws Exception {
-    FileSystem mockS3 = getMockS3A();
-
     JobContext job = getJob();
     job.getConfiguration().unset(
         FS_S3A_COMMITTER_STAGING_CONFLICT_MODE);
     final PartitionedStagingCommitter committer = newTaskCommitter();
 
     committer.setupTask(getTAC());
-    assertConflictResolution(committer, job, ConflictResolution.FAIL);
-    createTestOutputFiles(relativeFiles,
-        committer.getTaskAttemptPath(getTAC()), getTAC().getConfiguration());
-
-    // test failure when one partition already exists
-    reset(mockS3);
-    Path exists = new Path(outputPath, relativeFiles.get(0)).getParent();
-    pathExists(mockS3, exists);
-
-    intercept(PathExistsException.class,
-        InternalCommitterConstants.E_DEST_EXISTS,
-        "Expected a PathExistsException as a partition"
-            + " already exists:" + exists,
-        () ->  {
-            committer.commitTask(getTAC());
-            mockS3.getFileStatus(exists);
-        });
-
-    // test success
-    reset(mockS3);
-
-    committer.commitTask(getTAC());
-    Set<String> files = Sets.newHashSet();
-    for (InitiateMultipartUploadRequest request :
-        getMockResults().getRequests().values()) {
-      assertEquals(BUCKET, request.getBucketName());
-      files.add(request.getKey());
-    }
-    assertEquals("Should have the right number of uploads",
-        relativeFiles.size(), files.size());
-
-    Set<String> expected = buildExpectedList(committer);
-
-    assertEquals("Should have correct paths", expected, files);
+    assertConflictResolution(committer, job, ConflictResolution.APPEND);
   }
 
   @Test
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java
index ea28ecb..3685766 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java
@@ -19,7 +19,12 @@
 package org.apache.hadoop.fs.s3a.commit.staging.integration;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.stream.Collectors;
 
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
 import org.apache.hadoop.fs.s3a.commit.CommitConstants;
@@ -30,6 +35,9 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
+import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.CONFLICT_MODE_APPEND;
+import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_CONFLICT_MODE;
+
 /** ITest of the low level protocol methods. */
 public class ITestDirectoryCommitProtocol extends ITestStagingCommitProtocol {
 
@@ -58,6 +66,32 @@ public class ITestDirectoryCommitProtocol extends 
ITestStagingCommitProtocol {
   }
 
   /**
+   * This is here because somehow test runs were failing with
+   * the confict mode being fail. Unsetting per-bucket options
+   * in setup made this go away; its retained for regression
+   * testing
+   */
+  @Test
+  public void testValidateDefaultConflictMode() throws Throwable {
+    describe("Checking default conflict mode adoption");
+    Configuration baseConf = new Configuration(true);
+    String[] sources = baseConf.getPropertySources(
+        FS_S3A_COMMITTER_STAGING_CONFLICT_MODE);
+    String sourceStr = Arrays.stream(sources)
+        .collect(Collectors.joining(","));
+    String baseConfVal = baseConf
+        .getTrimmed(FS_S3A_COMMITTER_STAGING_CONFLICT_MODE);
+    assertEquals("conflict mode in core config from "+ sourceStr,
+        CONFLICT_MODE_APPEND, baseConfVal);
+
+    Configuration fsConf = getFileSystem().getConf();
+    String conflictModeDefVal = fsConf
+        .getTrimmed(FS_S3A_COMMITTER_STAGING_CONFLICT_MODE);
+    assertEquals("conflict mode in filesystem",
+        CONFLICT_MODE_APPEND, conflictModeDefVal);
+  }
+
+  /**
    * The class provides a overridden implementation of commitJobInternal which
    * causes the commit failed for the first time then succeed.
    */
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/AbstractCommitTerasortIT.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/AbstractCommitTerasortIT.java
index 7db3068..479b3c8 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/AbstractCommitTerasortIT.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/AbstractCommitTerasortIT.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.fs.s3a.commit.terasort;
 
+import java.io.File;
+import java.nio.charset.Charset;
 import java.util.Optional;
 import java.util.function.BiConsumer;
 
@@ -27,13 +29,13 @@ import org.junit.runners.MethodSorters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.examples.terasort.TeraGen;
 import org.apache.hadoop.examples.terasort.TeraSort;
 import org.apache.hadoop.examples.terasort.TeraSortConfigKeys;
 import org.apache.hadoop.examples.terasort.TeraValidate;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.commit.AbstractYarnClusterITest;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.DurationInfo;
@@ -108,6 +110,9 @@ public abstract class AbstractCommitTerasortIT extends
     yarnConfig.setBoolean(
         TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(),
         true);
+    yarnConfig.setBoolean(
+        TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(),
+        false);
     terasortPath = new Path("/terasort-" + getClass().getSimpleName())
         .makeQualified(getFileSystem());
     sortInput = new Path(terasortPath, "sortin");
@@ -143,7 +148,7 @@ public abstract class AbstractCommitTerasortIT extends
     assertEquals(stage
         + "(" + StringUtils.join(", ", args) + ")"
         + " failed", 0, result);
-    validateSuccessFile(getFileSystem(), dest, committerName());
+    validateSuccessFile(dest, committerName(), getFileSystem(), stage);
     return Optional.of(d);
   }
 
@@ -161,6 +166,7 @@ public abstract class AbstractCommitTerasortIT extends
   @Test
   public void test_110_teragen() throws Throwable {
     describe("Teragen to %s", sortInput);
+    getFileSystem().delete(sortInput, true);
 
     JobConf jobConf = newJobConf();
     patchConfigurationForCommitter(jobConf);
@@ -174,7 +180,9 @@ public abstract class AbstractCommitTerasortIT extends
   @Test
   public void test_120_terasort() throws Throwable {
     describe("Terasort from %s to %s", sortInput, sortOutput);
-    loadSuccessFile(getFileSystem(), sortInput);
+    getFileSystem().delete(sortOutput, true);
+
+    loadSuccessFile(getFileSystem(), sortInput, "previous teragen stage");
     JobConf jobConf = newJobConf();
     patchConfigurationForCommitter(jobConf);
     // this job adds some data, so skip it.
@@ -189,7 +197,8 @@ public abstract class AbstractCommitTerasortIT extends
   @Test
   public void test_130_teravalidate() throws Throwable {
     describe("TeraValidate from %s to %s", sortOutput, sortValidate);
-    loadSuccessFile(getFileSystem(), sortOutput);
+    getFileSystem().delete(sortValidate, true);
+    loadSuccessFile(getFileSystem(), sortOutput, "previous terasort stage");
     JobConf jobConf = newJobConf();
     patchConfigurationForCommitter(jobConf);
     teravalidateStageDuration = executeStage("TeraValidate",
@@ -224,9 +233,9 @@ public abstract class AbstractCommitTerasortIT extends
     stage.accept("Validate", teravalidateStageDuration);
     stage.accept("Completed", terasortDuration);
     String text = results.toString();
-    Path path = new Path(terasortPath, "results.csv");
-    LOG.info("Results are in {}\n{}", path, text);
-    ContractTestUtils.writeTextFile(getFileSystem(), path, text, true);
+    File resultsFile = File.createTempFile("results", ".csv");
+    FileUtils.write(resultsFile, text, Charset.forName("UTF-8"));
+    LOG.info("Results are in {}\n{}", resultsFile, text);
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to