HADOOP-15576. S3A Multipart Uploader to work with S3Guard and encryption 
Originally contributed by Ewan Higgs with refinements by Steve Loughran.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2ec97abb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2ec97abb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2ec97abb

Branch: refs/heads/HDFS-12090
Commit: 2ec97abb2e93c1a8127e7a146c08e26454b583fa
Parents: 4203bc7
Author: Ewan Higgs <ewan.hi...@wdc.com>
Authored: Wed Aug 8 13:50:23 2018 +0200
Committer: Ewan Higgs <ewan.hi...@wdc.com>
Committed: Wed Aug 8 13:50:23 2018 +0200

----------------------------------------------------------------------
 .../hadoop/fs/FileSystemMultipartUploader.java  |  69 +++--
 .../org/apache/hadoop/fs/MultipartUploader.java |  32 +-
 .../java/org/apache/hadoop/fs/PartHandle.java   |   8 +-
 .../java/org/apache/hadoop/fs/PathHandle.java   |   9 +-
 .../fs/AbstractSystemMultipartUploaderTest.java | 143 ---------
 .../TestLocalFileSystemMultipartUploader.java   |  65 ----
 .../AbstractContractMultipartUploaderTest.java  | 300 +++++++++++++++++++
 .../TestLocalFSContractMultipartUploader.java   |  43 +++
 .../hadoop/fs/TestHDFSMultipartUploader.java    |  76 -----
 .../hdfs/TestHDFSContractMultipartUploader.java |  58 ++++
 .../hadoop/fs/s3a/S3AMultipartUploader.java     | 177 +++++++----
 .../hadoop/fs/s3a/WriteOperationHelper.java     |   4 +
 ...rg.apache.hadoop.fs.MultipartUploaderFactory |  15 -
 ...rg.apache.hadoop.fs.MultipartUploaderFactory |  15 +
 .../s3a/ITestS3AContractMultipartUploader.java  | 116 +++++++
 .../apache/hadoop/fs/s3a/S3ATestConstants.java  |   5 +
 .../fs/s3a/TestS3AMultipartUploaderSupport.java |  84 ++++++
 .../TestStagingPartitionedJobCommit.java        |   4 +-
 .../fs/s3a/scale/AbstractSTestS3AHugeFiles.java |   4 +-
 .../src/test/resources/contract/s3a.xml         |   5 +
 20 files changed, 831 insertions(+), 401 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java
index b57ff3d..a700a9f 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java
@@ -16,12 +16,6 @@
  */
 package org.apache.hadoop.fs;
 
-import com.google.common.base.Charsets;
-import org.apache.commons.compress.utils.IOUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.permission.FsPermission;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
@@ -29,13 +23,26 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+
+import org.apache.commons.compress.utils.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import static org.apache.hadoop.fs.Path.mergePaths;
+
 /**
  * A MultipartUploader that uses the basic FileSystem commands.
  * This is done in three stages:
- * Init - create a temp _multipart directory.
- * PutPart - copying the individual parts of the file to the temp directory.
- * Complete - use {@link FileSystem#concat} to merge the files; and then delete
- * the temp directory.
+ * <ul>
+ *   <li>Init - create a temp {@code _multipart} directory.</li>
+ *   <li>PutPart - copying the individual parts of the file to the temp
+ *   directory.</li>
+ *   <li>Complete - use {@link FileSystem#concat} to merge the files;
+ *   and then delete the temp directory.</li>
+ * </ul>
  */
 public class FileSystemMultipartUploader extends MultipartUploader {
 
@@ -64,28 +71,44 @@ public class FileSystemMultipartUploader extends 
MultipartUploader {
     Path collectorPath = new Path(new String(uploadIdByteArray, 0,
         uploadIdByteArray.length, Charsets.UTF_8));
     Path partPath =
-        Path.mergePaths(collectorPath, Path.mergePaths(new 
Path(Path.SEPARATOR),
+        mergePaths(collectorPath, mergePaths(new Path(Path.SEPARATOR),
             new Path(Integer.toString(partNumber) + ".part")));
-    FSDataOutputStreamBuilder outputStream = fs.createFile(partPath);
-    FSDataOutputStream fsDataOutputStream = outputStream.build();
-    IOUtils.copy(inputStream, fsDataOutputStream, 4096);
-    fsDataOutputStream.close();
+    try(FSDataOutputStream fsDataOutputStream =
+            fs.createFile(partPath).build()) {
+      IOUtils.copy(inputStream, fsDataOutputStream, 4096);
+    } finally {
+      org.apache.hadoop.io.IOUtils.cleanupWithLogger(LOG, inputStream);
+    }
     return BBPartHandle.from(ByteBuffer.wrap(
         partPath.toString().getBytes(Charsets.UTF_8)));
   }
 
   private Path createCollectorPath(Path filePath) {
-    return Path.mergePaths(filePath.getParent(),
-        Path.mergePaths(new Path(filePath.getName().split("\\.")[0]),
-            Path.mergePaths(new Path("_multipart"),
+    return mergePaths(filePath.getParent(),
+        mergePaths(new Path(filePath.getName().split("\\.")[0]),
+            mergePaths(new Path("_multipart"),
                 new Path(Path.SEPARATOR))));
   }
 
+  private PathHandle getPathHandle(Path filePath) throws IOException {
+    FileStatus status = fs.getFileStatus(filePath);
+    return fs.getPathHandle(status);
+  }
+
   @Override
   @SuppressWarnings("deprecation") // rename w/ OVERWRITE
   public PathHandle complete(Path filePath,
       List<Pair<Integer, PartHandle>> handles, UploadHandle multipartUploadId)
       throws IOException {
+
+    if (handles.isEmpty()) {
+      throw new IOException("Empty upload");
+    }
+    // If destination already exists, we believe we already completed it.
+    if (fs.exists(filePath)) {
+      return getPathHandle(filePath);
+    }
+
     handles.sort(Comparator.comparing(Pair::getKey));
     List<Path> partHandles = handles
         .stream()
@@ -97,22 +120,26 @@ public class FileSystemMultipartUploader extends 
MultipartUploader {
         .collect(Collectors.toList());
 
     Path collectorPath = createCollectorPath(filePath);
-    Path filePathInsideCollector = Path.mergePaths(collectorPath,
+    Path filePathInsideCollector = mergePaths(collectorPath,
         new Path(Path.SEPARATOR + filePath.getName()));
     fs.create(filePathInsideCollector).close();
     fs.concat(filePathInsideCollector,
         partHandles.toArray(new Path[handles.size()]));
     fs.rename(filePathInsideCollector, filePath, Options.Rename.OVERWRITE);
     fs.delete(collectorPath, true);
-    FileStatus status = fs.getFileStatus(filePath);
-    return fs.getPathHandle(status);
+    return getPathHandle(filePath);
   }
 
   @Override
   public void abort(Path filePath, UploadHandle uploadId) throws IOException {
     byte[] uploadIdByteArray = uploadId.toByteArray();
+    Preconditions.checkArgument(uploadIdByteArray.length != 0,
+        "UploadId is empty");
     Path collectorPath = new Path(new String(uploadIdByteArray, 0,
         uploadIdByteArray.length, Charsets.UTF_8));
+
+    // force a check for a file existing; raises FNFE if not found
+    fs.getFileStatus(collectorPath);
     fs.delete(collectorPath, true);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java
index 24a9216..47fd9f2 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java
@@ -21,17 +21,20 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
 
-import org.apache.commons.lang3.tuple.Pair;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.lang3.tuple.Pair;
+
 /**
  * MultipartUploader is an interface for copying files multipart and across
  * multiple nodes. Users should:
- * 1. Initialize an upload
- * 2. Upload parts in any order
- * 3. Complete the upload in order to have it materialize in the destination 
FS.
+ * <ol>
+ *   <li>Initialize an upload</li>
+ *   <li>Upload parts in any order</li>
+ *   <li>Complete the upload in order to have it materialize in the destination
+ *   FS</li>
+ * </ol>
  *
  * Implementers should make sure that the complete function should make sure
  * that 'complete' will reorder parts if the destination FS doesn't already
@@ -45,7 +48,7 @@ public abstract class MultipartUploader {
    * Initialize a multipart upload.
    * @param filePath Target path for upload.
    * @return unique identifier associating part uploads.
-   * @throws IOException
+   * @throws IOException IO failure
    */
   public abstract UploadHandle initialize(Path filePath) throws IOException;
 
@@ -53,12 +56,13 @@ public abstract class MultipartUploader {
    * Put part as part of a multipart upload. It should be possible to have
    * parts uploaded in any order (or in parallel).
    * @param filePath Target path for upload (same as {@link 
#initialize(Path)}).
-   * @param inputStream Data for this part.
+   * @param inputStream Data for this part. Implementations MUST close this
+   * stream after reading in the data.
    * @param partNumber Index of the part relative to others.
    * @param uploadId Identifier from {@link #initialize(Path)}.
    * @param lengthInBytes Target length to read from the stream.
    * @return unique PartHandle identifier for the uploaded part.
-   * @throws IOException
+   * @throws IOException IO failure
    */
   public abstract PartHandle putPart(Path filePath, InputStream inputStream,
       int partNumber, UploadHandle uploadId, long lengthInBytes)
@@ -67,12 +71,12 @@ public abstract class MultipartUploader {
   /**
    * Complete a multipart upload.
    * @param filePath Target path for upload (same as {@link #initialize(Path)}.
-   * @param handles Identifiers with associated part numbers from
-   *          {@link #putPart(Path, InputStream, int, UploadHandle, long)}.
+   * @param handles non-empty list of identifiers with associated part numbers
+   *          from {@link #putPart(Path, InputStream, int, UploadHandle, 
long)}.
    *          Depending on the backend, the list order may be significant.
    * @param multipartUploadId Identifier from {@link #initialize(Path)}.
    * @return unique PathHandle identifier for the uploaded file.
-   * @throws IOException
+   * @throws IOException IO failure or the handle list is empty.
    */
   public abstract PathHandle complete(Path filePath,
       List<Pair<Integer, PartHandle>> handles, UploadHandle multipartUploadId)
@@ -81,10 +85,10 @@ public abstract class MultipartUploader {
   /**
    * Aborts a multipart upload.
    * @param filePath Target path for upload (same as {@link #initialize(Path)}.
-   * @param multipartuploadId Identifier from {@link #initialize(Path)}.
-   * @throws IOException
+   * @param multipartUploadId Identifier from {@link #initialize(Path)}.
+   * @throws IOException IO failure
    */
-  public abstract void abort(Path filePath, UploadHandle multipartuploadId)
+  public abstract void abort(Path filePath, UploadHandle multipartUploadId)
       throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartHandle.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartHandle.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartHandle.java
index df70b74..47ce3ab 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartHandle.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartHandle.java
@@ -16,14 +16,14 @@
  */
 package org.apache.hadoop.fs;
 
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
 /**
- * Opaque, serializable reference to an part id for multipart uploads.
+ * Opaque, serializable reference to a part id for multipart uploads.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathHandle.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathHandle.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathHandle.java
index 60aa6a5..d5304ba 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathHandle.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathHandle.java
@@ -25,15 +25,16 @@ import org.apache.hadoop.classification.InterfaceStability;
 
 /**
  * Opaque, serializable reference to an entity in the FileSystem. May contain
- * metadata sufficient to resolve or verify subsequent accesses indepedent of
+ * metadata sufficient to resolve or verify subsequent accesses independent of
  * other modifications to the FileSystem.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
+@FunctionalInterface
 public interface PathHandle extends Serializable {
 
   /**
-   * @return Serialized from in bytes.
+   * @return Serialized form in bytes.
    */
   default byte[] toByteArray() {
     ByteBuffer bb = bytes();
@@ -42,6 +43,10 @@ public interface PathHandle extends Serializable {
     return ret;
   }
 
+  /**
+   * Get the bytes of this path handle.
+   * @return the bytes to get to the process completing the upload.
+   */
   ByteBuffer bytes();
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/AbstractSystemMultipartUploaderTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/AbstractSystemMultipartUploaderTest.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/AbstractSystemMultipartUploaderTest.java
deleted file mode 100644
index f132089..0000000
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/AbstractSystemMultipartUploaderTest.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.fs;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.tuple.Pair;
-
-import org.junit.Test;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public abstract class AbstractSystemMultipartUploaderTest {
-
-  abstract FileSystem getFS() throws IOException;
-
-  abstract Path getBaseTestPath();
-
-  @Test
-  public void testMultipartUpload() throws Exception {
-    FileSystem fs = getFS();
-    Path file = new Path(getBaseTestPath(), "some-file");
-    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
-    UploadHandle uploadHandle = mpu.initialize(file);
-    List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
-    StringBuilder sb = new StringBuilder();
-    for (int i = 1; i <= 100; ++i) {
-      String contents = "ThisIsPart" + i + "\n";
-      sb.append(contents);
-      int len = contents.getBytes().length;
-      InputStream is = IOUtils.toInputStream(contents, "UTF-8");
-      PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len);
-      partHandles.add(Pair.of(i, partHandle));
-    }
-    PathHandle fd = mpu.complete(file, partHandles, uploadHandle);
-    byte[] fdData = IOUtils.toByteArray(fs.open(fd));
-    byte[] fileData = IOUtils.toByteArray(fs.open(file));
-    String readString = new String(fdData);
-    assertEquals(sb.toString(), readString);
-    assertArrayEquals(fdData, fileData);
-  }
-
-  @Test
-  public void testMultipartUploadReverseOrder() throws Exception {
-    FileSystem fs = getFS();
-    Path file = new Path(getBaseTestPath(), "some-file");
-    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
-    UploadHandle uploadHandle = mpu.initialize(file);
-    List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
-    StringBuilder sb = new StringBuilder();
-    for (int i = 1; i <= 100; ++i) {
-      String contents = "ThisIsPart" + i + "\n";
-      sb.append(contents);
-    }
-    for (int i = 100; i > 0; --i) {
-      String contents = "ThisIsPart" + i + "\n";
-      int len = contents.getBytes().length;
-      InputStream is = IOUtils.toInputStream(contents, "UTF-8");
-      PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len);
-      partHandles.add(Pair.of(i, partHandle));
-    }
-    PathHandle fd = mpu.complete(file, partHandles, uploadHandle);
-    byte[] fdData = IOUtils.toByteArray(fs.open(fd));
-    byte[] fileData = IOUtils.toByteArray(fs.open(file));
-    String readString = new String(fdData);
-    assertEquals(sb.toString(), readString);
-    assertArrayEquals(fdData, fileData);
-  }
-
-  @Test
-  public void testMultipartUploadReverseOrderNoNContiguousPartNumbers()
-      throws Exception {
-    FileSystem fs = getFS();
-    Path file = new Path(getBaseTestPath(), "some-file");
-    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
-    UploadHandle uploadHandle = mpu.initialize(file);
-    List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
-    StringBuilder sb = new StringBuilder();
-    for (int i = 2; i <= 200; i += 2) {
-      String contents = "ThisIsPart" + i + "\n";
-      sb.append(contents);
-    }
-    for (int i = 200; i > 0; i -= 2) {
-      String contents = "ThisIsPart" + i + "\n";
-      int len = contents.getBytes().length;
-      InputStream is = IOUtils.toInputStream(contents, "UTF-8");
-      PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len);
-      partHandles.add(Pair.of(i, partHandle));
-    }
-    PathHandle fd = mpu.complete(file, partHandles, uploadHandle);
-    byte[] fdData = IOUtils.toByteArray(fs.open(fd));
-    byte[] fileData = IOUtils.toByteArray(fs.open(file));
-    String readString = new String(fdData);
-    assertEquals(sb.toString(), readString);
-    assertArrayEquals(fdData, fileData);
-  }
-
-  @Test
-  public void testMultipartUploadAbort() throws Exception {
-    FileSystem fs = getFS();
-    Path file = new Path(getBaseTestPath(), "some-file");
-    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
-    UploadHandle uploadHandle = mpu.initialize(file);
-    for (int i = 100; i >= 50; --i) {
-      String contents = "ThisIsPart" + i + "\n";
-      int len = contents.getBytes().length;
-      InputStream is = IOUtils.toInputStream(contents, "UTF-8");
-      PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len);
-    }
-    mpu.abort(file, uploadHandle);
-
-    String contents = "ThisIsPart49\n";
-    int len = contents.getBytes().length;
-    InputStream is = IOUtils.toInputStream(contents, "UTF-8");
-
-    try {
-      mpu.putPart(file, is, 49, uploadHandle, len);
-      fail("putPart should have thrown an exception");
-    } catch (IOException ok) {
-      // ignore
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystemMultipartUploader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystemMultipartUploader.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystemMultipartUploader.java
deleted file mode 100644
index 21d01b6..0000000
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystemMultipartUploader.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.fs;
-
-import org.apache.hadoop.conf.Configuration;
-import static org.apache.hadoop.test.GenericTestUtils.getRandomizedTestDir;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-
-import java.io.File;
-import java.io.IOException;
-
-/**
- * Test the FileSystemMultipartUploader on local file system.
- */
-public class TestLocalFileSystemMultipartUploader
-    extends AbstractSystemMultipartUploaderTest {
-
-  private static FileSystem fs;
-  private File tmp;
-
-  @BeforeClass
-  public static void init() throws IOException {
-    fs = LocalFileSystem.getLocal(new Configuration());
-  }
-
-  @Before
-  public void setup() throws IOException {
-    tmp = getRandomizedTestDir();
-    tmp.mkdirs();
-  }
-
-  @After
-  public void tearDown() throws IOException {
-    tmp.delete();
-  }
-
-  @Override
-  public FileSystem getFS() {
-    return fs;
-  }
-
-  @Override
-  public Path getBaseTestPath() {
-    return new Path(tmp.getAbsolutePath());
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
new file mode 100644
index 0000000..c0e1600
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.fs.contract;
+
+import java.io.ByteArrayInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.base.Charsets;
+import org.junit.Test;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.fs.BBUploadHandle;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.MultipartUploader;
+import org.apache.hadoop.fs.MultipartUploaderFactory;
+import org.apache.hadoop.fs.PartHandle;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathHandle;
+import org.apache.hadoop.fs.UploadHandle;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+public abstract class AbstractContractMultipartUploaderTest extends
+    AbstractFSContractTestBase {
+
+  /**
+   * The payload is the part number repeated for the length of the part.
+   * This makes checking the correctness of the upload straightforward.
+   * @param partNumber part number
+   * @return the bytes to upload.
+   */
+  private byte[] generatePayload(int partNumber) {
+    int sizeInBytes = partSizeInBytes();
+    ByteBuffer buffer = ByteBuffer.allocate(sizeInBytes);
+    for (int i=0 ; i < sizeInBytes/(Integer.SIZE/Byte.SIZE); ++i) {
+      buffer.putInt(partNumber);
+    }
+    return buffer.array();
+  }
+
+  /**
+   * Load a path, make an MD5 digest.
+   * @param path path to load
+   * @return the digest array
+   * @throws IOException failure to read or digest the file.
+   */
+  protected byte[] digest(Path path) throws IOException {
+    FileSystem fs = getFileSystem();
+    try (InputStream in = fs.open(path)) {
+      byte[] fdData = IOUtils.toByteArray(in);
+      MessageDigest newDigest = DigestUtils.getMd5Digest();
+      return newDigest.digest(fdData);
+    }
+  }
+
+  /**
+   * Get the partition size in bytes to use for each upload.
+   * @return a number > 0
+   */
+  protected abstract int partSizeInBytes();
+
+  /**
+   * Get the number of test payloads to upload.
+   * @return a number > 1
+   */
+  protected int getTestPayloadCount() {
+    return 10;
+  }
+
+  /**
+   * Assert that a multipart upload is successful.
+   * @throws Exception failure
+   */
+  @Test
+  public void testSingleUpload() throws Exception {
+    FileSystem fs = getFileSystem();
+    Path file = path("testSingleUpload");
+    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+    UploadHandle uploadHandle = mpu.initialize(file);
+    List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
+    MessageDigest origDigest = DigestUtils.getMd5Digest();
+    byte[] payload = generatePayload(1);
+    origDigest.update(payload);
+    InputStream is = new ByteArrayInputStream(payload);
+    PartHandle partHandle = mpu.putPart(file, is, 1, uploadHandle,
+        payload.length);
+    partHandles.add(Pair.of(1, partHandle));
+    PathHandle fd = completeUpload(file, mpu, uploadHandle, partHandles,
+        origDigest,
+        payload.length);
+
+    // Complete is idempotent
+    PathHandle fd2 = mpu.complete(file, partHandles, uploadHandle);
+    assertArrayEquals("Path handles differ", fd.toByteArray(),
+        fd2.toByteArray());
+  }
+
+  private PathHandle completeUpload(final Path file,
+      final MultipartUploader mpu,
+      final UploadHandle uploadHandle,
+      final List<Pair<Integer, PartHandle>> partHandles,
+      final MessageDigest origDigest,
+      final int expectedLength) throws IOException {
+    PathHandle fd = mpu.complete(file, partHandles, uploadHandle);
+
+    FileStatus status = verifyPathExists(getFileSystem(),
+        "Completed file", file);
+    assertEquals("length of " + status,
+        expectedLength, status.getLen());
+
+    assertArrayEquals("digest of source and " + file
+            + " differ",
+        origDigest.digest(), digest(file));
+    return fd;
+  }
+
+  /**
+   * Assert that a multipart upload is successful.
+   * @throws Exception failure
+   */
+  @Test
+  public void testMultipartUpload() throws Exception {
+    FileSystem fs = getFileSystem();
+    Path file = path("testMultipartUpload");
+    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+    UploadHandle uploadHandle = mpu.initialize(file);
+    List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
+    MessageDigest origDigest = DigestUtils.getMd5Digest();
+    final int payloadCount = getTestPayloadCount();
+    for (int i = 1; i <= payloadCount; ++i) {
+      byte[] payload = generatePayload(i);
+      origDigest.update(payload);
+      InputStream is = new ByteArrayInputStream(payload);
+      PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle,
+          payload.length);
+      partHandles.add(Pair.of(i, partHandle));
+    }
+    completeUpload(file, mpu, uploadHandle, partHandles, origDigest,
+        payloadCount * partSizeInBytes());
+  }
+
+  /**
+   * Assert that a multipart upload is successful even when the parts are
+   * given in the reverse order.
+   */
+  @Test
+  public void testMultipartUploadReverseOrder() throws Exception {
+    FileSystem fs = getFileSystem();
+    Path file = path("testMultipartUploadReverseOrder");
+    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+    UploadHandle uploadHandle = mpu.initialize(file);
+    List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
+    MessageDigest origDigest = DigestUtils.getMd5Digest();
+    final int payloadCount = getTestPayloadCount();
+    for (int i = 1; i <= payloadCount; ++i) {
+      byte[] payload = generatePayload(i);
+      origDigest.update(payload);
+    }
+    for (int i = payloadCount; i > 0; --i) {
+      byte[] payload = generatePayload(i);
+      InputStream is = new ByteArrayInputStream(payload);
+      PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle,
+          payload.length);
+      partHandles.add(Pair.of(i, partHandle));
+    }
+    completeUpload(file, mpu, uploadHandle, partHandles, origDigest,
+        payloadCount * partSizeInBytes());
+  }
+
+  /**
+   * Assert that a multipart upload is successful even when the parts are
+   * given in reverse order and the part numbers are not contiguous.
+   */
+  @Test
+  public void testMultipartUploadReverseOrderNonContiguousPartNumbers()
+      throws Exception {
+    describe("Upload in reverse order and the part numbers are not 
contiguous");
+    FileSystem fs = getFileSystem();
+    Path file = 
path("testMultipartUploadReverseOrderNonContiguousPartNumbers");
+    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+    UploadHandle uploadHandle = mpu.initialize(file);
+    List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
+    MessageDigest origDigest = DigestUtils.getMd5Digest();
+    int payloadCount = 2 * getTestPayloadCount();
+    for (int i = 2; i <= payloadCount; i += 2) {
+      byte[] payload = generatePayload(i);
+      origDigest.update(payload);
+    }
+    for (int i = payloadCount; i > 0; i -= 2) {
+      byte[] payload = generatePayload(i);
+      InputStream is = new ByteArrayInputStream(payload);
+      PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle,
+          payload.length);
+      partHandles.add(Pair.of(i, partHandle));
+    }
+    completeUpload(file, mpu, uploadHandle, partHandles, origDigest,
+        getTestPayloadCount() * partSizeInBytes());
+  }
+
+  /**
+   * Assert that when we abort a multipart upload, the resulting file does
+   * not show up.
+   */
+  @Test
+  public void testMultipartUploadAbort() throws Exception {
+    describe("Upload and then abort it before completing");
+    FileSystem fs = getFileSystem();
+    Path file = path("testMultipartUploadAbort");
+    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+    UploadHandle uploadHandle = mpu.initialize(file);
+    List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
+    for (int i = 20; i >= 10; --i) {
+      byte[] payload = generatePayload(i);
+      InputStream is = new ByteArrayInputStream(payload);
+      PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle,
+          payload.length);
+      partHandles.add(Pair.of(i, partHandle));
+    }
+    mpu.abort(file, uploadHandle);
+
+    String contents = "ThisIsPart49\n";
+    int len = contents.getBytes(Charsets.UTF_8).length;
+    InputStream is = IOUtils.toInputStream(contents, "UTF-8");
+
+    intercept(IOException.class,
+        () -> mpu.putPart(file, is, 49, uploadHandle, len));
+    intercept(IOException.class,
+        () -> mpu.complete(file, partHandles, uploadHandle));
+
+    assertPathDoesNotExist("Uploaded file should not exist", file);
+  }
+
+  /**
+   * Trying to abort from an invalid handle must fail.
+   */
+  @Test
+  public void testAbortUnknownUpload() throws Exception {
+    FileSystem fs = getFileSystem();
+    Path file = path("testAbortUnknownUpload");
+    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+    ByteBuffer byteBuffer = ByteBuffer.wrap(
+        "invalid-handle".getBytes(Charsets.UTF_8));
+    UploadHandle uploadHandle = BBUploadHandle.from(byteBuffer);
+    intercept(FileNotFoundException.class, () -> mpu.abort(file, 
uploadHandle));
+  }
+
+  /**
+   * Trying to abort with a handle of size 0 must fail.
+   */
+  @Test
+  public void testAbortEmptyUploadHandle() throws Exception {
+    FileSystem fs = getFileSystem();
+    Path file = path("testAbortEmptyUpload");
+    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+    ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[0]);
+    UploadHandle uploadHandle = BBUploadHandle.from(byteBuffer);
+    intercept(IllegalArgumentException.class,
+        () -> mpu.abort(file, uploadHandle));
+  }
+
+  /**
+   * When we complete with no parts provided, it must fail.
+   */
+  @Test
+  public void testCompleteEmptyUpload() throws Exception {
+    describe("Expect an empty MPU to fail, but still be abortable");
+    FileSystem fs = getFileSystem();
+    Path dest = path("testCompleteEmptyUpload");
+    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+    UploadHandle handle = mpu.initialize(dest);
+    intercept(IOException.class,
+        () -> mpu.complete(dest, new ArrayList<>(), handle));
+    mpu.abort(dest, handle);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java
new file mode 100644
index 0000000..a50d2e4
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.contract.localfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * Test the FileSystemMultipartUploader on local file system.
+ */
+public class TestLocalFSContractMultipartUploader
+    extends AbstractContractMultipartUploaderTest {
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new LocalFSContract(conf);
+  }
+
+  /**
+   * There is no real need to upload any particular size.
+   * @return 1 kilobyte
+   */
+  @Override
+  protected int partSizeInBytes() {
+    return 1024;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java
deleted file mode 100644
index 96c5093..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.fs;
-
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.rules.TestName;
-
-import java.io.IOException;
-
-public class TestHDFSMultipartUploader
-    extends AbstractSystemMultipartUploaderTest {
-
-  private static MiniDFSCluster cluster;
-  private Path tmp;
-
-  @Rule
-  public TestName name = new TestName();
-
-  @BeforeClass
-  public static void init() throws IOException {
-    HdfsConfiguration conf = new HdfsConfiguration();
-    cluster = new MiniDFSCluster.Builder(conf,
-          GenericTestUtils.getRandomizedTestDir())
-        .numDataNodes(1)
-        .build();
-    cluster.waitClusterUp();
-  }
-
-  @AfterClass
-  public static void cleanup() throws IOException {
-    if (cluster != null) {
-      cluster.shutdown();
-      cluster = null;
-    }
-  }
-
-  @Before
-  public void setup() throws IOException {
-    tmp = new Path(cluster.getFileSystem().getWorkingDirectory(),
-        name.getMethodName());
-    cluster.getFileSystem().mkdirs(tmp);
-  }
-
-  @Override
-  public FileSystem getFS() throws IOException {
-    return cluster.getFileSystem();
-  }
-
-  @Override
-  public Path getBaseTestPath() {
-    return tmp;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java
new file mode 100644
index 0000000..f3a5265
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.fs.contract.hdfs;
+
+import java.io.IOException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * Test MultipartUploader tests on HDFS.
+ */
+public class TestHDFSContractMultipartUploader extends
+    AbstractContractMultipartUploaderTest {
+
+  @BeforeClass
+  public static void createCluster() throws IOException {
+    HDFSContract.createCluster();
+  }
+
+  @AfterClass
+  public static void teardownCluster() throws IOException {
+    HDFSContract.destroyCluster();
+  }
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new HDFSContract(conf);
+  }
+
+  /**
+   * HDFS doesn't have any restriction on the part size.
+   * @return 1KB
+   */
+  @Override
+  protected int partSizeInBytes() {
+    return 1024;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java
index 34c88d4..6a1df54 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java
@@ -17,15 +17,26 @@
  */
 package org.apache.hadoop.fs.s3a;
 
-import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
-import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
-import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
-import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
 import com.amazonaws.services.s3.model.PartETag;
 import com.amazonaws.services.s3.model.UploadPartRequest;
 import com.amazonaws.services.s3.model.UploadPartResult;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BBPartHandle;
@@ -37,13 +48,8 @@ import org.apache.hadoop.fs.PartHandle;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathHandle;
 import org.apache.hadoop.fs.UploadHandle;
-import org.apache.hadoop.hdfs.DFSUtilClient;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.stream.Collectors;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A;
 
 /**
  * MultipartUploader for S3AFileSystem. This uses the S3 multipart
@@ -53,6 +59,10 @@ public class S3AMultipartUploader extends MultipartUploader {
 
   private final S3AFileSystem s3a;
 
+  /** Header for Parts: {@value}. */
+
+  public static final String HEADER = "S3A-part01";
+
   public S3AMultipartUploader(FileSystem fs, Configuration conf) {
     if (!(fs instanceof S3AFileSystem)) {
       throw new IllegalArgumentException(
@@ -63,75 +73,72 @@ public class S3AMultipartUploader extends MultipartUploader 
{
 
   @Override
   public UploadHandle initialize(Path filePath) throws IOException {
+    final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper();
     String key = s3a.pathToKey(filePath);
-    InitiateMultipartUploadRequest request =
-        new InitiateMultipartUploadRequest(s3a.getBucket(), key);
-    LOG.debug("initialize request: {}", request);
-    InitiateMultipartUploadResult result = 
s3a.initiateMultipartUpload(request);
-    String uploadId = result.getUploadId();
+    String uploadId = writeHelper.initiateMultiPartUpload(key);
     return BBUploadHandle.from(ByteBuffer.wrap(
         uploadId.getBytes(Charsets.UTF_8)));
   }
 
   @Override
   public PartHandle putPart(Path filePath, InputStream inputStream,
-      int partNumber, UploadHandle uploadId, long lengthInBytes) {
+      int partNumber, UploadHandle uploadId, long lengthInBytes)
+      throws IOException {
+    final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper();
     String key = s3a.pathToKey(filePath);
-    UploadPartRequest request = new UploadPartRequest();
     byte[] uploadIdBytes = uploadId.toByteArray();
-    request.setUploadId(new String(uploadIdBytes, 0, uploadIdBytes.length,
-        Charsets.UTF_8));
-    request.setInputStream(inputStream);
-    request.setPartSize(lengthInBytes);
-    request.setPartNumber(partNumber);
-    request.setBucketName(s3a.getBucket());
-    request.setKey(key);
-    LOG.debug("putPart request: {}", request);
-    UploadPartResult result = s3a.uploadPart(request);
+    String uploadIdString = new String(uploadIdBytes, 0, uploadIdBytes.length,
+        Charsets.UTF_8);
+    UploadPartRequest request = writeHelper.newUploadPartRequest(key,
+        uploadIdString, partNumber, (int) lengthInBytes, inputStream, null, 
0L);
+    UploadPartResult result = writeHelper.uploadPart(request);
     String eTag = result.getETag();
-    return BBPartHandle.from(ByteBuffer.wrap(eTag.getBytes(Charsets.UTF_8)));
+    return BBPartHandle.from(
+        ByteBuffer.wrap(
+            buildPartHandlePayload(eTag, lengthInBytes)));
   }
 
   @Override
   public PathHandle complete(Path filePath,
-      List<Pair<Integer, PartHandle>> handles, UploadHandle uploadId) {
-    String key = s3a.pathToKey(filePath);
-    CompleteMultipartUploadRequest request =
-        new CompleteMultipartUploadRequest();
-    request.setBucketName(s3a.getBucket());
-    request.setKey(key);
+      List<Pair<Integer, PartHandle>> handles, UploadHandle uploadId)
+      throws IOException {
     byte[] uploadIdBytes = uploadId.toByteArray();
-    request.setUploadId(new String(uploadIdBytes, 0, uploadIdBytes.length,
-        Charsets.UTF_8));
-    List<PartETag> eTags = handles
-        .stream()
-        .map(handle -> {
-          byte[] partEtagBytes = handle.getRight().toByteArray();
-          return new PartETag(handle.getLeft(),
-              new String(partEtagBytes, 0, partEtagBytes.length,
-                  Charsets.UTF_8));
-        })
-        .collect(Collectors.toList());
-    request.setPartETags(eTags);
-    LOG.debug("Complete request: {}", request);
-    CompleteMultipartUploadResult completeMultipartUploadResult =
-        s3a.getAmazonS3Client().completeMultipartUpload(request);
-
-    byte[] eTag = DFSUtilClient.string2Bytes(
-        completeMultipartUploadResult.getETag());
+    checkUploadId(uploadIdBytes);
+    if (handles.isEmpty()) {
+      throw new IOException("Empty upload");
+    }
+
+    final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper();
+    String key = s3a.pathToKey(filePath);
+
+    String uploadIdStr = new String(uploadIdBytes, 0, uploadIdBytes.length,
+        Charsets.UTF_8);
+    ArrayList<PartETag> eTags = new ArrayList<>();
+    eTags.ensureCapacity(handles.size());
+    long totalLength = 0;
+    for (Pair<Integer, PartHandle> handle : handles) {
+      byte[] payload = handle.getRight().toByteArray();
+      Pair<Long, String> result = parsePartHandlePayload(payload);
+      totalLength += result.getLeft();
+      eTags.add(new PartETag(handle.getLeft(), result.getRight()));
+    }
+    AtomicInteger errorCount = new AtomicInteger(0);
+    CompleteMultipartUploadResult result = writeHelper.completeMPUwithRetries(
+        key, uploadIdStr, eTags, totalLength, errorCount);
+
+    byte[] eTag = result.getETag().getBytes(Charsets.UTF_8);
     return (PathHandle) () -> ByteBuffer.wrap(eTag);
   }
 
   @Override
-  public void abort(Path filePath, UploadHandle uploadId) {
+  public void abort(Path filePath, UploadHandle uploadId) throws IOException {
+    final byte[] uploadIdBytes = uploadId.toByteArray();
+    checkUploadId(uploadIdBytes);
+    final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper();
     String key = s3a.pathToKey(filePath);
-    byte[] uploadIdBytes = uploadId.toByteArray();
     String uploadIdString = new String(uploadIdBytes, 0, uploadIdBytes.length,
         Charsets.UTF_8);
-    AbortMultipartUploadRequest request = new AbortMultipartUploadRequest(s3a
-        .getBucket(), key, uploadIdString);
-    LOG.debug("Abort request: {}", request);
-    s3a.getAmazonS3Client().abortMultipartUpload(request);
+    writeHelper.abortMultipartCommit(key, uploadIdString);
   }
 
   /**
@@ -141,10 +148,64 @@ public class S3AMultipartUploader extends 
MultipartUploader {
     @Override
     protected MultipartUploader createMultipartUploader(FileSystem fs,
         Configuration conf) {
-      if (fs.getScheme().equals("s3a")) {
+      if (FS_S3A.equals(fs.getScheme())) {
         return new S3AMultipartUploader(fs, conf);
       }
       return null;
     }
   }
+
+  private void checkUploadId(byte[] uploadId) throws IllegalArgumentException {
+    Preconditions.checkArgument(uploadId.length > 0,
+        "Empty UploadId is not valid");
+  }
+
+  /**
+   * Build the payload for marshalling.
+   * @param eTag upload etag
+   * @param len length
+   * @return a byte array to marshall.
+   * @throws IOException error writing the payload
+   */
+  @VisibleForTesting
+  static byte[] buildPartHandlePayload(String eTag, long len)
+      throws IOException {
+    Preconditions.checkArgument(StringUtils.isNotEmpty(eTag),
+        "Empty etag");
+    Preconditions.checkArgument(len > 0,
+        "Invalid length");
+
+    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+    try(DataOutputStream output = new DataOutputStream(bytes)) {
+      output.writeUTF(HEADER);
+      output.writeLong(len);
+      output.writeUTF(eTag);
+    }
+    return bytes.toByteArray();
+  }
+
+  /**
+   * Parse the payload marshalled as a part handle.
+   * @param data handle data
+   * @return the length and etag
+   * @throws IOException error reading the payload
+   */
+  static Pair<Long, String> parsePartHandlePayload(byte[] data)
+      throws IOException {
+
+    try(DataInputStream input =
+            new DataInputStream(new ByteArrayInputStream(data))) {
+      final String header = input.readUTF();
+      if (!HEADER.equals(header)) {
+        throw new IOException("Wrong header string: \"" + header + "\"");
+      }
+      final long len = input.readLong();
+      final String etag = input.readUTF();
+      if (len <= 0) {
+        throw new IOException("Negative length");
+      }
+      return Pair.of(len, etag);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
index 46ca65c..a85a87f 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
@@ -219,6 +219,10 @@ public class WriteOperationHelper {
       List<PartETag> partETags,
       long length,
       Retried retrying) throws IOException {
+    if (partETags.isEmpty()) {
+      throw new IOException(
+          "No upload parts in multipart upload to " + destKey);
+    }
     return invoker.retry("Completing multipart commit", destKey,
         true,
         retrying,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-tools/hadoop-aws/src/main/resources/META-INF/org.apache.hadoop.fs.MultipartUploaderFactory
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/resources/META-INF/org.apache.hadoop.fs.MultipartUploaderFactory
 
b/hadoop-tools/hadoop-aws/src/main/resources/META-INF/org.apache.hadoop.fs.MultipartUploaderFactory
deleted file mode 100644
index 2e4bc24..0000000
--- 
a/hadoop-tools/hadoop-aws/src/main/resources/META-INF/org.apache.hadoop.fs.MultipartUploaderFactory
+++ /dev/null
@@ -1,15 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-org.apache.hadoop.fs.s3a.S3AMultipartUploader$Factory

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory
 
b/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory
new file mode 100644
index 0000000..2e4bc24
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.hadoop.fs.s3a.S3AMultipartUploader$Factory

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
new file mode 100644
index 0000000..d28f39b
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.contract.s3a;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.WriteOperationHelper;
+
+import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+import static 
org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles.DEFAULT_HUGE_PARTITION_SIZE;
+
+/**
+ * Test MultipartUploader with S3A.
+ */
+public class ITestS3AContractMultipartUploader extends
+    AbstractContractMultipartUploaderTest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestS3AContractMultipartUploader.class);
+
+  private int partitionSize;
+
+  /**
+   * S3 requires a minimum part size of 5MB (except the last part).
+   * @return 5MB
+   */
+  @Override
+  protected int partSizeInBytes() {
+    return partitionSize;
+  }
+
+  @Override
+  protected int getTestPayloadCount() {
+    return 3;
+  }
+
+  @Override
+  public S3AFileSystem getFileSystem() {
+    return (S3AFileSystem) super.getFileSystem();
+  }
+
+  /**
+   * Create a configuration, possibly patching in S3Guard options.
+   * @return a configuration
+   */
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    maybeEnableS3Guard(conf);
+    return conf;
+  }
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new S3AContract(conf);
+  }
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    Configuration conf = getContract().getConf();
+    boolean enabled = getTestPropertyBool(
+        conf,
+        KEY_SCALE_TESTS_ENABLED,
+        DEFAULT_SCALE_TESTS_ENABLED);
+    assume("Scale test disabled: to enable set property " +
+            KEY_SCALE_TESTS_ENABLED,
+        enabled);
+    partitionSize = (int) getTestPropertyBytes(conf,
+        KEY_HUGE_PARTITION_SIZE,
+        DEFAULT_HUGE_PARTITION_SIZE);
+  }
+
+  /**
+   * Extend superclass teardown with actions to help clean up the S3 store,
+   * including aborting uploads under the test path.
+   */
+  @Override
+  public void teardown() throws Exception {
+    Path teardown = path("teardown").getParent();
+    S3AFileSystem fs = getFileSystem();
+    WriteOperationHelper helper = fs.getWriteOperationHelper();
+    try {
+      LOG.info("Teardown: aborting outstanding uploads under {}", teardown);
+      int count = 
helper.abortMultipartUploadsUnderPath(fs.pathToKey(teardown));
+      LOG.info("Found {} incomplete uploads", count);
+    } catch (IOException e) {
+      LOG.warn("IOE in teardown", e);
+    }
+    super.teardown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
index 0f7b418..ce2a98e 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
@@ -106,6 +106,11 @@ public interface S3ATestConstants {
   String KEY_HUGE_PARTITION_SIZE = S3A_SCALE_TEST + "huge.partitionsize";
 
   /**
+   * Size of partitions to upload: {@value}.
+   */
+  String DEFAULT_HUGE_PARTITION_SIZE = "8M";
+
+  /**
    * The default huge size is small —full 5GB+ scale tests are something
    * to run in long test runs on EC2 VMs. {@value}.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java
new file mode 100644
index 0000000..35d0460
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static org.apache.hadoop.fs.s3a.S3AMultipartUploader.*;
+import static 
org.apache.hadoop.fs.s3a.S3AMultipartUploader.parsePartHandlePayload;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Test multipart upload support methods and classes.
+ */
+public class TestS3AMultipartUploaderSupport extends HadoopTestBase {
+
+  @Test
+  public void testRoundTrip() throws Throwable {
+    Pair<Long, String> result = roundTrip("tag", 1);
+    assertEquals("tag", result.getRight());
+    assertEquals(1, result.getLeft().longValue());
+  }
+
+  @Test
+  public void testRoundTrip2() throws Throwable {
+    long len = 1L + Integer.MAX_VALUE;
+    Pair<Long, String> result = roundTrip("11223344",
+        len);
+    assertEquals("11223344", result.getRight());
+    assertEquals(len, result.getLeft().longValue());
+  }
+
+  @Test
+  public void testNoEtag() throws Throwable {
+    intercept(IllegalArgumentException.class,
+        () -> buildPartHandlePayload("", 1));
+  }
+
+  @Test
+  public void testNoLen() throws Throwable {
+    intercept(IllegalArgumentException.class,
+        () -> buildPartHandlePayload("tag", 0));
+  }
+
+  @Test
+  public void testBadPayload() throws Throwable {
+    intercept(EOFException.class,
+        () -> parsePartHandlePayload(new byte[0]));
+  }
+
+  @Test
+  public void testBadHeader() throws Throwable {
+    byte[] bytes = buildPartHandlePayload("tag", 1);
+    bytes[2]='f';
+    intercept(IOException.class, "header",
+        () -> parsePartHandlePayload(bytes));
+  }
+
+  private Pair<Long, String> roundTrip(final String tag, final long len) 
throws IOException {
+    byte[] bytes = buildPartHandlePayload(tag, len);
+    return parsePartHandlePayload(bytes);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java
index 4df3912..55e4dc7 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java
@@ -83,7 +83,9 @@ public class TestStagingPartitionedJobCommit
           commit.setDestinationKey(key);
           commit.setUri("s3a://" + BUCKET + "/" + key);
           commit.setUploadId(UUID.randomUUID().toString());
-          commit.setEtags(new ArrayList<>());
+          ArrayList<String> etags = new ArrayList<>();
+          etags.add("tag1");
+          commit.setEtags(etags);
           pending.add(commit);
         }
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
index 02236eb..88a19d5 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
@@ -64,7 +64,7 @@ public abstract class AbstractSTestS3AHugeFiles extends 
S3AScaleTestBase {
   private static final Logger LOG = LoggerFactory.getLogger(
       AbstractSTestS3AHugeFiles.class);
   public static final int DEFAULT_UPLOAD_BLOCKSIZE = 64 * _1KB;
-  public static final String DEFAULT_PARTITION_SIZE = "8M";
+
   private Path scaleTestDir;
   private Path hugefile;
   private Path hugefileRenamed;
@@ -101,7 +101,7 @@ public abstract class AbstractSTestS3AHugeFiles extends 
S3AScaleTestBase {
     Configuration conf = super.createScaleConfiguration();
     partitionSize = (int) getTestPropertyBytes(conf,
         KEY_HUGE_PARTITION_SIZE,
-        DEFAULT_PARTITION_SIZE);
+        DEFAULT_HUGE_PARTITION_SIZE);
     assertTrue("Partition size too small: " + partitionSize,
         partitionSize > MULTIPART_MIN_SIZE);
     conf.setLong(SOCKET_SEND_BUFFER, _1MB);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml 
b/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml
index fe0af66..ec4c54a 100644
--- a/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml
+++ b/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml
@@ -108,6 +108,11 @@
   </property>
 
   <property>
+    <name>fs.contract.supports-multipartuploader</name>
+    <value>true</value>
+  </property>
+
+  <property>
     <name>fs.contract.supports-unix-permissions</name>
     <value>false</value>
   </property>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to