HADOOP-15576. S3A Multipart Uploader to work with S3Guard and encryption Originally contributed by Ewan Higgs with refinements by Steve Loughran.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2ec97abb Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2ec97abb Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2ec97abb Branch: refs/heads/HDFS-12090 Commit: 2ec97abb2e93c1a8127e7a146c08e26454b583fa Parents: 4203bc7 Author: Ewan Higgs <[email protected]> Authored: Wed Aug 8 13:50:23 2018 +0200 Committer: Ewan Higgs <[email protected]> Committed: Wed Aug 8 13:50:23 2018 +0200 ---------------------------------------------------------------------- .../hadoop/fs/FileSystemMultipartUploader.java | 69 +++-- .../org/apache/hadoop/fs/MultipartUploader.java | 32 +- .../java/org/apache/hadoop/fs/PartHandle.java | 8 +- .../java/org/apache/hadoop/fs/PathHandle.java | 9 +- .../fs/AbstractSystemMultipartUploaderTest.java | 143 --------- .../TestLocalFileSystemMultipartUploader.java | 65 ---- .../AbstractContractMultipartUploaderTest.java | 300 +++++++++++++++++++ .../TestLocalFSContractMultipartUploader.java | 43 +++ .../hadoop/fs/TestHDFSMultipartUploader.java | 76 ----- .../hdfs/TestHDFSContractMultipartUploader.java | 58 ++++ .../hadoop/fs/s3a/S3AMultipartUploader.java | 177 +++++++---- .../hadoop/fs/s3a/WriteOperationHelper.java | 4 + ...rg.apache.hadoop.fs.MultipartUploaderFactory | 15 - ...rg.apache.hadoop.fs.MultipartUploaderFactory | 15 + .../s3a/ITestS3AContractMultipartUploader.java | 116 +++++++ .../apache/hadoop/fs/s3a/S3ATestConstants.java | 5 + .../fs/s3a/TestS3AMultipartUploaderSupport.java | 84 ++++++ .../TestStagingPartitionedJobCommit.java | 4 +- .../fs/s3a/scale/AbstractSTestS3AHugeFiles.java | 4 +- .../src/test/resources/contract/s3a.xml | 5 + 20 files changed, 831 insertions(+), 401 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java index b57ff3d..a700a9f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java @@ -16,12 +16,6 @@ */ package org.apache.hadoop.fs; -import com.google.common.base.Charsets; -import org.apache.commons.compress.utils.IOUtils; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.permission.FsPermission; - import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; @@ -29,13 +23,26 @@ import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; +import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; + +import org.apache.commons.compress.utils.IOUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.permission.FsPermission; + +import static org.apache.hadoop.fs.Path.mergePaths; + /** * A MultipartUploader that uses the basic FileSystem commands. * This is done in three stages: - * Init - create a temp _multipart directory. - * PutPart - copying the individual parts of the file to the temp directory. - * Complete - use {@link FileSystem#concat} to merge the files; and then delete - * the temp directory. + * <ul> + * <li>Init - create a temp {@code _multipart} directory.</li> + * <li>PutPart - copying the individual parts of the file to the temp + * directory.</li> + * <li>Complete - use {@link FileSystem#concat} to merge the files; + * and then delete the temp directory.</li> + * </ul> */ public class FileSystemMultipartUploader extends MultipartUploader { @@ -64,28 +71,44 @@ public class FileSystemMultipartUploader extends MultipartUploader { Path collectorPath = new Path(new String(uploadIdByteArray, 0, uploadIdByteArray.length, Charsets.UTF_8)); Path partPath = - Path.mergePaths(collectorPath, Path.mergePaths(new Path(Path.SEPARATOR), + mergePaths(collectorPath, mergePaths(new Path(Path.SEPARATOR), new Path(Integer.toString(partNumber) + ".part"))); - FSDataOutputStreamBuilder outputStream = fs.createFile(partPath); - FSDataOutputStream fsDataOutputStream = outputStream.build(); - IOUtils.copy(inputStream, fsDataOutputStream, 4096); - fsDataOutputStream.close(); + try(FSDataOutputStream fsDataOutputStream = + fs.createFile(partPath).build()) { + IOUtils.copy(inputStream, fsDataOutputStream, 4096); + } finally { + org.apache.hadoop.io.IOUtils.cleanupWithLogger(LOG, inputStream); + } return BBPartHandle.from(ByteBuffer.wrap( partPath.toString().getBytes(Charsets.UTF_8))); } private Path createCollectorPath(Path filePath) { - return Path.mergePaths(filePath.getParent(), - Path.mergePaths(new Path(filePath.getName().split("\\.")[0]), - Path.mergePaths(new Path("_multipart"), + return mergePaths(filePath.getParent(), + mergePaths(new Path(filePath.getName().split("\\.")[0]), + mergePaths(new Path("_multipart"), new Path(Path.SEPARATOR)))); } + private PathHandle getPathHandle(Path filePath) throws IOException { + FileStatus status = fs.getFileStatus(filePath); + return fs.getPathHandle(status); + } + @Override @SuppressWarnings("deprecation") // rename w/ OVERWRITE public PathHandle complete(Path filePath, List<Pair<Integer, PartHandle>> handles, UploadHandle multipartUploadId) throws IOException { + + if (handles.isEmpty()) { + throw new IOException("Empty upload"); + } + // If destination already exists, we believe we already completed it. + if (fs.exists(filePath)) { + return getPathHandle(filePath); + } + handles.sort(Comparator.comparing(Pair::getKey)); List<Path> partHandles = handles .stream() @@ -97,22 +120,26 @@ public class FileSystemMultipartUploader extends MultipartUploader { .collect(Collectors.toList()); Path collectorPath = createCollectorPath(filePath); - Path filePathInsideCollector = Path.mergePaths(collectorPath, + Path filePathInsideCollector = mergePaths(collectorPath, new Path(Path.SEPARATOR + filePath.getName())); fs.create(filePathInsideCollector).close(); fs.concat(filePathInsideCollector, partHandles.toArray(new Path[handles.size()])); fs.rename(filePathInsideCollector, filePath, Options.Rename.OVERWRITE); fs.delete(collectorPath, true); - FileStatus status = fs.getFileStatus(filePath); - return fs.getPathHandle(status); + return getPathHandle(filePath); } @Override public void abort(Path filePath, UploadHandle uploadId) throws IOException { byte[] uploadIdByteArray = uploadId.toByteArray(); + Preconditions.checkArgument(uploadIdByteArray.length != 0, + "UploadId is empty"); Path collectorPath = new Path(new String(uploadIdByteArray, 0, uploadIdByteArray.length, Charsets.UTF_8)); + + // force a check for a file existing; raises FNFE if not found + fs.getFileStatus(collectorPath); fs.delete(collectorPath, true); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java index 24a9216..47fd9f2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java @@ -21,17 +21,20 @@ import java.io.IOException; import java.io.InputStream; import java.util.List; -import org.apache.commons.lang3.tuple.Pair; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.tuple.Pair; + /** * MultipartUploader is an interface for copying files multipart and across * multiple nodes. Users should: - * 1. Initialize an upload - * 2. Upload parts in any order - * 3. Complete the upload in order to have it materialize in the destination FS. + * <ol> + * <li>Initialize an upload</li> + * <li>Upload parts in any order</li> + * <li>Complete the upload in order to have it materialize in the destination + * FS</li> + * </ol> * * Implementers should make sure that the complete function should make sure * that 'complete' will reorder parts if the destination FS doesn't already @@ -45,7 +48,7 @@ public abstract class MultipartUploader { * Initialize a multipart upload. * @param filePath Target path for upload. * @return unique identifier associating part uploads. - * @throws IOException + * @throws IOException IO failure */ public abstract UploadHandle initialize(Path filePath) throws IOException; @@ -53,12 +56,13 @@ public abstract class MultipartUploader { * Put part as part of a multipart upload. It should be possible to have * parts uploaded in any order (or in parallel). * @param filePath Target path for upload (same as {@link #initialize(Path)}). - * @param inputStream Data for this part. + * @param inputStream Data for this part. Implementations MUST close this + * stream after reading in the data. * @param partNumber Index of the part relative to others. * @param uploadId Identifier from {@link #initialize(Path)}. * @param lengthInBytes Target length to read from the stream. * @return unique PartHandle identifier for the uploaded part. - * @throws IOException + * @throws IOException IO failure */ public abstract PartHandle putPart(Path filePath, InputStream inputStream, int partNumber, UploadHandle uploadId, long lengthInBytes) @@ -67,12 +71,12 @@ public abstract class MultipartUploader { /** * Complete a multipart upload. * @param filePath Target path for upload (same as {@link #initialize(Path)}. - * @param handles Identifiers with associated part numbers from - * {@link #putPart(Path, InputStream, int, UploadHandle, long)}. + * @param handles non-empty list of identifiers with associated part numbers + * from {@link #putPart(Path, InputStream, int, UploadHandle, long)}. * Depending on the backend, the list order may be significant. * @param multipartUploadId Identifier from {@link #initialize(Path)}. * @return unique PathHandle identifier for the uploaded file. - * @throws IOException + * @throws IOException IO failure or the handle list is empty. */ public abstract PathHandle complete(Path filePath, List<Pair<Integer, PartHandle>> handles, UploadHandle multipartUploadId) @@ -81,10 +85,10 @@ public abstract class MultipartUploader { /** * Aborts a multipart upload. * @param filePath Target path for upload (same as {@link #initialize(Path)}. - * @param multipartuploadId Identifier from {@link #initialize(Path)}. - * @throws IOException + * @param multipartUploadId Identifier from {@link #initialize(Path)}. + * @throws IOException IO failure */ - public abstract void abort(Path filePath, UploadHandle multipartuploadId) + public abstract void abort(Path filePath, UploadHandle multipartUploadId) throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartHandle.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartHandle.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartHandle.java index df70b74..47ce3ab 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartHandle.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartHandle.java @@ -16,14 +16,14 @@ */ package org.apache.hadoop.fs; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - import java.io.Serializable; import java.nio.ByteBuffer; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + /** - * Opaque, serializable reference to an part id for multipart uploads. + * Opaque, serializable reference to a part id for multipart uploads. */ @InterfaceAudience.Public @InterfaceStability.Evolving http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathHandle.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathHandle.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathHandle.java index 60aa6a5..d5304ba 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathHandle.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathHandle.java @@ -25,15 +25,16 @@ import org.apache.hadoop.classification.InterfaceStability; /** * Opaque, serializable reference to an entity in the FileSystem. May contain - * metadata sufficient to resolve or verify subsequent accesses indepedent of + * metadata sufficient to resolve or verify subsequent accesses independent of * other modifications to the FileSystem. */ @InterfaceAudience.Public @InterfaceStability.Evolving +@FunctionalInterface public interface PathHandle extends Serializable { /** - * @return Serialized from in bytes. + * @return Serialized form in bytes. */ default byte[] toByteArray() { ByteBuffer bb = bytes(); @@ -42,6 +43,10 @@ public interface PathHandle extends Serializable { return ret; } + /** + * Get the bytes of this path handle. + * @return the bytes to get to the process completing the upload. + */ ByteBuffer bytes(); @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/AbstractSystemMultipartUploaderTest.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/AbstractSystemMultipartUploaderTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/AbstractSystemMultipartUploaderTest.java deleted file mode 100644 index f132089..0000000 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/AbstractSystemMultipartUploaderTest.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.fs; - -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.tuple.Pair; - -import org.junit.Test; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -public abstract class AbstractSystemMultipartUploaderTest { - - abstract FileSystem getFS() throws IOException; - - abstract Path getBaseTestPath(); - - @Test - public void testMultipartUpload() throws Exception { - FileSystem fs = getFS(); - Path file = new Path(getBaseTestPath(), "some-file"); - MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); - UploadHandle uploadHandle = mpu.initialize(file); - List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>(); - StringBuilder sb = new StringBuilder(); - for (int i = 1; i <= 100; ++i) { - String contents = "ThisIsPart" + i + "\n"; - sb.append(contents); - int len = contents.getBytes().length; - InputStream is = IOUtils.toInputStream(contents, "UTF-8"); - PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len); - partHandles.add(Pair.of(i, partHandle)); - } - PathHandle fd = mpu.complete(file, partHandles, uploadHandle); - byte[] fdData = IOUtils.toByteArray(fs.open(fd)); - byte[] fileData = IOUtils.toByteArray(fs.open(file)); - String readString = new String(fdData); - assertEquals(sb.toString(), readString); - assertArrayEquals(fdData, fileData); - } - - @Test - public void testMultipartUploadReverseOrder() throws Exception { - FileSystem fs = getFS(); - Path file = new Path(getBaseTestPath(), "some-file"); - MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); - UploadHandle uploadHandle = mpu.initialize(file); - List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>(); - StringBuilder sb = new StringBuilder(); - for (int i = 1; i <= 100; ++i) { - String contents = "ThisIsPart" + i + "\n"; - sb.append(contents); - } - for (int i = 100; i > 0; --i) { - String contents = "ThisIsPart" + i + "\n"; - int len = contents.getBytes().length; - InputStream is = IOUtils.toInputStream(contents, "UTF-8"); - PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len); - partHandles.add(Pair.of(i, partHandle)); - } - PathHandle fd = mpu.complete(file, partHandles, uploadHandle); - byte[] fdData = IOUtils.toByteArray(fs.open(fd)); - byte[] fileData = IOUtils.toByteArray(fs.open(file)); - String readString = new String(fdData); - assertEquals(sb.toString(), readString); - assertArrayEquals(fdData, fileData); - } - - @Test - public void testMultipartUploadReverseOrderNoNContiguousPartNumbers() - throws Exception { - FileSystem fs = getFS(); - Path file = new Path(getBaseTestPath(), "some-file"); - MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); - UploadHandle uploadHandle = mpu.initialize(file); - List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>(); - StringBuilder sb = new StringBuilder(); - for (int i = 2; i <= 200; i += 2) { - String contents = "ThisIsPart" + i + "\n"; - sb.append(contents); - } - for (int i = 200; i > 0; i -= 2) { - String contents = "ThisIsPart" + i + "\n"; - int len = contents.getBytes().length; - InputStream is = IOUtils.toInputStream(contents, "UTF-8"); - PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len); - partHandles.add(Pair.of(i, partHandle)); - } - PathHandle fd = mpu.complete(file, partHandles, uploadHandle); - byte[] fdData = IOUtils.toByteArray(fs.open(fd)); - byte[] fileData = IOUtils.toByteArray(fs.open(file)); - String readString = new String(fdData); - assertEquals(sb.toString(), readString); - assertArrayEquals(fdData, fileData); - } - - @Test - public void testMultipartUploadAbort() throws Exception { - FileSystem fs = getFS(); - Path file = new Path(getBaseTestPath(), "some-file"); - MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); - UploadHandle uploadHandle = mpu.initialize(file); - for (int i = 100; i >= 50; --i) { - String contents = "ThisIsPart" + i + "\n"; - int len = contents.getBytes().length; - InputStream is = IOUtils.toInputStream(contents, "UTF-8"); - PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len); - } - mpu.abort(file, uploadHandle); - - String contents = "ThisIsPart49\n"; - int len = contents.getBytes().length; - InputStream is = IOUtils.toInputStream(contents, "UTF-8"); - - try { - mpu.putPart(file, is, 49, uploadHandle, len); - fail("putPart should have thrown an exception"); - } catch (IOException ok) { - // ignore - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystemMultipartUploader.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystemMultipartUploader.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystemMultipartUploader.java deleted file mode 100644 index 21d01b6..0000000 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystemMultipartUploader.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.fs; - -import org.apache.hadoop.conf.Configuration; -import static org.apache.hadoop.test.GenericTestUtils.getRandomizedTestDir; - -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; - -import java.io.File; -import java.io.IOException; - -/** - * Test the FileSystemMultipartUploader on local file system. - */ -public class TestLocalFileSystemMultipartUploader - extends AbstractSystemMultipartUploaderTest { - - private static FileSystem fs; - private File tmp; - - @BeforeClass - public static void init() throws IOException { - fs = LocalFileSystem.getLocal(new Configuration()); - } - - @Before - public void setup() throws IOException { - tmp = getRandomizedTestDir(); - tmp.mkdirs(); - } - - @After - public void tearDown() throws IOException { - tmp.delete(); - } - - @Override - public FileSystem getFS() { - return fs; - } - - @Override - public Path getBaseTestPath() { - return new Path(tmp.getAbsolutePath()); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java new file mode 100644 index 0000000..c0e1600 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.contract; + +import java.io.ByteArrayInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.security.MessageDigest; +import java.util.ArrayList; +import java.util.List; + +import com.google.common.base.Charsets; +import org.junit.Test; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.fs.BBUploadHandle; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.MultipartUploader; +import org.apache.hadoop.fs.MultipartUploaderFactory; +import org.apache.hadoop.fs.PartHandle; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathHandle; +import org.apache.hadoop.fs.UploadHandle; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +public abstract class AbstractContractMultipartUploaderTest extends + AbstractFSContractTestBase { + + /** + * The payload is the part number repeated for the length of the part. + * This makes checking the correctness of the upload straightforward. + * @param partNumber part number + * @return the bytes to upload. + */ + private byte[] generatePayload(int partNumber) { + int sizeInBytes = partSizeInBytes(); + ByteBuffer buffer = ByteBuffer.allocate(sizeInBytes); + for (int i=0 ; i < sizeInBytes/(Integer.SIZE/Byte.SIZE); ++i) { + buffer.putInt(partNumber); + } + return buffer.array(); + } + + /** + * Load a path, make an MD5 digest. + * @param path path to load + * @return the digest array + * @throws IOException failure to read or digest the file. + */ + protected byte[] digest(Path path) throws IOException { + FileSystem fs = getFileSystem(); + try (InputStream in = fs.open(path)) { + byte[] fdData = IOUtils.toByteArray(in); + MessageDigest newDigest = DigestUtils.getMd5Digest(); + return newDigest.digest(fdData); + } + } + + /** + * Get the partition size in bytes to use for each upload. + * @return a number > 0 + */ + protected abstract int partSizeInBytes(); + + /** + * Get the number of test payloads to upload. + * @return a number > 1 + */ + protected int getTestPayloadCount() { + return 10; + } + + /** + * Assert that a multipart upload is successful. + * @throws Exception failure + */ + @Test + public void testSingleUpload() throws Exception { + FileSystem fs = getFileSystem(); + Path file = path("testSingleUpload"); + MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); + UploadHandle uploadHandle = mpu.initialize(file); + List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>(); + MessageDigest origDigest = DigestUtils.getMd5Digest(); + byte[] payload = generatePayload(1); + origDigest.update(payload); + InputStream is = new ByteArrayInputStream(payload); + PartHandle partHandle = mpu.putPart(file, is, 1, uploadHandle, + payload.length); + partHandles.add(Pair.of(1, partHandle)); + PathHandle fd = completeUpload(file, mpu, uploadHandle, partHandles, + origDigest, + payload.length); + + // Complete is idempotent + PathHandle fd2 = mpu.complete(file, partHandles, uploadHandle); + assertArrayEquals("Path handles differ", fd.toByteArray(), + fd2.toByteArray()); + } + + private PathHandle completeUpload(final Path file, + final MultipartUploader mpu, + final UploadHandle uploadHandle, + final List<Pair<Integer, PartHandle>> partHandles, + final MessageDigest origDigest, + final int expectedLength) throws IOException { + PathHandle fd = mpu.complete(file, partHandles, uploadHandle); + + FileStatus status = verifyPathExists(getFileSystem(), + "Completed file", file); + assertEquals("length of " + status, + expectedLength, status.getLen()); + + assertArrayEquals("digest of source and " + file + + " differ", + origDigest.digest(), digest(file)); + return fd; + } + + /** + * Assert that a multipart upload is successful. + * @throws Exception failure + */ + @Test + public void testMultipartUpload() throws Exception { + FileSystem fs = getFileSystem(); + Path file = path("testMultipartUpload"); + MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); + UploadHandle uploadHandle = mpu.initialize(file); + List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>(); + MessageDigest origDigest = DigestUtils.getMd5Digest(); + final int payloadCount = getTestPayloadCount(); + for (int i = 1; i <= payloadCount; ++i) { + byte[] payload = generatePayload(i); + origDigest.update(payload); + InputStream is = new ByteArrayInputStream(payload); + PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, + payload.length); + partHandles.add(Pair.of(i, partHandle)); + } + completeUpload(file, mpu, uploadHandle, partHandles, origDigest, + payloadCount * partSizeInBytes()); + } + + /** + * Assert that a multipart upload is successful even when the parts are + * given in the reverse order. + */ + @Test + public void testMultipartUploadReverseOrder() throws Exception { + FileSystem fs = getFileSystem(); + Path file = path("testMultipartUploadReverseOrder"); + MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); + UploadHandle uploadHandle = mpu.initialize(file); + List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>(); + MessageDigest origDigest = DigestUtils.getMd5Digest(); + final int payloadCount = getTestPayloadCount(); + for (int i = 1; i <= payloadCount; ++i) { + byte[] payload = generatePayload(i); + origDigest.update(payload); + } + for (int i = payloadCount; i > 0; --i) { + byte[] payload = generatePayload(i); + InputStream is = new ByteArrayInputStream(payload); + PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, + payload.length); + partHandles.add(Pair.of(i, partHandle)); + } + completeUpload(file, mpu, uploadHandle, partHandles, origDigest, + payloadCount * partSizeInBytes()); + } + + /** + * Assert that a multipart upload is successful even when the parts are + * given in reverse order and the part numbers are not contiguous. + */ + @Test + public void testMultipartUploadReverseOrderNonContiguousPartNumbers() + throws Exception { + describe("Upload in reverse order and the part numbers are not contiguous"); + FileSystem fs = getFileSystem(); + Path file = path("testMultipartUploadReverseOrderNonContiguousPartNumbers"); + MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); + UploadHandle uploadHandle = mpu.initialize(file); + List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>(); + MessageDigest origDigest = DigestUtils.getMd5Digest(); + int payloadCount = 2 * getTestPayloadCount(); + for (int i = 2; i <= payloadCount; i += 2) { + byte[] payload = generatePayload(i); + origDigest.update(payload); + } + for (int i = payloadCount; i > 0; i -= 2) { + byte[] payload = generatePayload(i); + InputStream is = new ByteArrayInputStream(payload); + PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, + payload.length); + partHandles.add(Pair.of(i, partHandle)); + } + completeUpload(file, mpu, uploadHandle, partHandles, origDigest, + getTestPayloadCount() * partSizeInBytes()); + } + + /** + * Assert that when we abort a multipart upload, the resulting file does + * not show up. + */ + @Test + public void testMultipartUploadAbort() throws Exception { + describe("Upload and then abort it before completing"); + FileSystem fs = getFileSystem(); + Path file = path("testMultipartUploadAbort"); + MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); + UploadHandle uploadHandle = mpu.initialize(file); + List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>(); + for (int i = 20; i >= 10; --i) { + byte[] payload = generatePayload(i); + InputStream is = new ByteArrayInputStream(payload); + PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, + payload.length); + partHandles.add(Pair.of(i, partHandle)); + } + mpu.abort(file, uploadHandle); + + String contents = "ThisIsPart49\n"; + int len = contents.getBytes(Charsets.UTF_8).length; + InputStream is = IOUtils.toInputStream(contents, "UTF-8"); + + intercept(IOException.class, + () -> mpu.putPart(file, is, 49, uploadHandle, len)); + intercept(IOException.class, + () -> mpu.complete(file, partHandles, uploadHandle)); + + assertPathDoesNotExist("Uploaded file should not exist", file); + } + + /** + * Trying to abort from an invalid handle must fail. + */ + @Test + public void testAbortUnknownUpload() throws Exception { + FileSystem fs = getFileSystem(); + Path file = path("testAbortUnknownUpload"); + MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); + ByteBuffer byteBuffer = ByteBuffer.wrap( + "invalid-handle".getBytes(Charsets.UTF_8)); + UploadHandle uploadHandle = BBUploadHandle.from(byteBuffer); + intercept(FileNotFoundException.class, () -> mpu.abort(file, uploadHandle)); + } + + /** + * Trying to abort with a handle of size 0 must fail. + */ + @Test + public void testAbortEmptyUploadHandle() throws Exception { + FileSystem fs = getFileSystem(); + Path file = path("testAbortEmptyUpload"); + MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); + ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[0]); + UploadHandle uploadHandle = BBUploadHandle.from(byteBuffer); + intercept(IllegalArgumentException.class, + () -> mpu.abort(file, uploadHandle)); + } + + /** + * When we complete with no parts provided, it must fail. + */ + @Test + public void testCompleteEmptyUpload() throws Exception { + describe("Expect an empty MPU to fail, but still be abortable"); + FileSystem fs = getFileSystem(); + Path dest = path("testCompleteEmptyUpload"); + MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); + UploadHandle handle = mpu.initialize(dest); + intercept(IOException.class, + () -> mpu.complete(dest, new ArrayList<>(), handle)); + mpu.abort(dest, handle); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java new file mode 100644 index 0000000..a50d2e4 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.contract.localfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * Test the FileSystemMultipartUploader on local file system. + */ +public class TestLocalFSContractMultipartUploader + extends AbstractContractMultipartUploaderTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new LocalFSContract(conf); + } + + /** + * There is no real need to upload any particular size. + * @return 1 kilobyte + */ + @Override + protected int partSizeInBytes() { + return 1024; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java deleted file mode 100644 index 96c5093..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.fs; - -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.test.GenericTestUtils; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.rules.TestName; - -import java.io.IOException; - -public class TestHDFSMultipartUploader - extends AbstractSystemMultipartUploaderTest { - - private static MiniDFSCluster cluster; - private Path tmp; - - @Rule - public TestName name = new TestName(); - - @BeforeClass - public static void init() throws IOException { - HdfsConfiguration conf = new HdfsConfiguration(); - cluster = new MiniDFSCluster.Builder(conf, - GenericTestUtils.getRandomizedTestDir()) - .numDataNodes(1) - .build(); - cluster.waitClusterUp(); - } - - @AfterClass - public static void cleanup() throws IOException { - if (cluster != null) { - cluster.shutdown(); - cluster = null; - } - } - - @Before - public void setup() throws IOException { - tmp = new Path(cluster.getFileSystem().getWorkingDirectory(), - name.getMethodName()); - cluster.getFileSystem().mkdirs(tmp); - } - - @Override - public FileSystem getFS() throws IOException { - return cluster.getFileSystem(); - } - - @Override - public Path getBaseTestPath() { - return tmp; - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java new file mode 100644 index 0000000..f3a5265 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.contract.hdfs; + +import java.io.IOException; + +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * Test MultipartUploader tests on HDFS. + */ +public class TestHDFSContractMultipartUploader extends + AbstractContractMultipartUploaderTest { + + @BeforeClass + public static void createCluster() throws IOException { + HDFSContract.createCluster(); + } + + @AfterClass + public static void teardownCluster() throws IOException { + HDFSContract.destroyCluster(); + } + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new HDFSContract(conf); + } + + /** + * HDFS doesn't have any restriction on the part size. + * @return 1KB + */ + @Override + protected int partSizeInBytes() { + return 1024; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java index 34c88d4..6a1df54 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java @@ -17,15 +17,26 @@ */ package org.apache.hadoop.fs.s3a; -import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; -import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; -import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; -import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; import com.amazonaws.services.s3.model.PartETag; import com.amazonaws.services.s3.model.UploadPartRequest; import com.amazonaws.services.s3.model.UploadPartResult; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; + +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BBPartHandle; @@ -37,13 +48,8 @@ import org.apache.hadoop.fs.PartHandle; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathHandle; import org.apache.hadoop.fs.UploadHandle; -import org.apache.hadoop.hdfs.DFSUtilClient; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.stream.Collectors; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A; /** * MultipartUploader for S3AFileSystem. This uses the S3 multipart @@ -53,6 +59,10 @@ public class S3AMultipartUploader extends MultipartUploader { private final S3AFileSystem s3a; + /** Header for Parts: {@value}. */ + + public static final String HEADER = "S3A-part01"; + public S3AMultipartUploader(FileSystem fs, Configuration conf) { if (!(fs instanceof S3AFileSystem)) { throw new IllegalArgumentException( @@ -63,75 +73,72 @@ public class S3AMultipartUploader extends MultipartUploader { @Override public UploadHandle initialize(Path filePath) throws IOException { + final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper(); String key = s3a.pathToKey(filePath); - InitiateMultipartUploadRequest request = - new InitiateMultipartUploadRequest(s3a.getBucket(), key); - LOG.debug("initialize request: {}", request); - InitiateMultipartUploadResult result = s3a.initiateMultipartUpload(request); - String uploadId = result.getUploadId(); + String uploadId = writeHelper.initiateMultiPartUpload(key); return BBUploadHandle.from(ByteBuffer.wrap( uploadId.getBytes(Charsets.UTF_8))); } @Override public PartHandle putPart(Path filePath, InputStream inputStream, - int partNumber, UploadHandle uploadId, long lengthInBytes) { + int partNumber, UploadHandle uploadId, long lengthInBytes) + throws IOException { + final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper(); String key = s3a.pathToKey(filePath); - UploadPartRequest request = new UploadPartRequest(); byte[] uploadIdBytes = uploadId.toByteArray(); - request.setUploadId(new String(uploadIdBytes, 0, uploadIdBytes.length, - Charsets.UTF_8)); - request.setInputStream(inputStream); - request.setPartSize(lengthInBytes); - request.setPartNumber(partNumber); - request.setBucketName(s3a.getBucket()); - request.setKey(key); - LOG.debug("putPart request: {}", request); - UploadPartResult result = s3a.uploadPart(request); + String uploadIdString = new String(uploadIdBytes, 0, uploadIdBytes.length, + Charsets.UTF_8); + UploadPartRequest request = writeHelper.newUploadPartRequest(key, + uploadIdString, partNumber, (int) lengthInBytes, inputStream, null, 0L); + UploadPartResult result = writeHelper.uploadPart(request); String eTag = result.getETag(); - return BBPartHandle.from(ByteBuffer.wrap(eTag.getBytes(Charsets.UTF_8))); + return BBPartHandle.from( + ByteBuffer.wrap( + buildPartHandlePayload(eTag, lengthInBytes))); } @Override public PathHandle complete(Path filePath, - List<Pair<Integer, PartHandle>> handles, UploadHandle uploadId) { - String key = s3a.pathToKey(filePath); - CompleteMultipartUploadRequest request = - new CompleteMultipartUploadRequest(); - request.setBucketName(s3a.getBucket()); - request.setKey(key); + List<Pair<Integer, PartHandle>> handles, UploadHandle uploadId) + throws IOException { byte[] uploadIdBytes = uploadId.toByteArray(); - request.setUploadId(new String(uploadIdBytes, 0, uploadIdBytes.length, - Charsets.UTF_8)); - List<PartETag> eTags = handles - .stream() - .map(handle -> { - byte[] partEtagBytes = handle.getRight().toByteArray(); - return new PartETag(handle.getLeft(), - new String(partEtagBytes, 0, partEtagBytes.length, - Charsets.UTF_8)); - }) - .collect(Collectors.toList()); - request.setPartETags(eTags); - LOG.debug("Complete request: {}", request); - CompleteMultipartUploadResult completeMultipartUploadResult = - s3a.getAmazonS3Client().completeMultipartUpload(request); - - byte[] eTag = DFSUtilClient.string2Bytes( - completeMultipartUploadResult.getETag()); + checkUploadId(uploadIdBytes); + if (handles.isEmpty()) { + throw new IOException("Empty upload"); + } + + final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper(); + String key = s3a.pathToKey(filePath); + + String uploadIdStr = new String(uploadIdBytes, 0, uploadIdBytes.length, + Charsets.UTF_8); + ArrayList<PartETag> eTags = new ArrayList<>(); + eTags.ensureCapacity(handles.size()); + long totalLength = 0; + for (Pair<Integer, PartHandle> handle : handles) { + byte[] payload = handle.getRight().toByteArray(); + Pair<Long, String> result = parsePartHandlePayload(payload); + totalLength += result.getLeft(); + eTags.add(new PartETag(handle.getLeft(), result.getRight())); + } + AtomicInteger errorCount = new AtomicInteger(0); + CompleteMultipartUploadResult result = writeHelper.completeMPUwithRetries( + key, uploadIdStr, eTags, totalLength, errorCount); + + byte[] eTag = result.getETag().getBytes(Charsets.UTF_8); return (PathHandle) () -> ByteBuffer.wrap(eTag); } @Override - public void abort(Path filePath, UploadHandle uploadId) { + public void abort(Path filePath, UploadHandle uploadId) throws IOException { + final byte[] uploadIdBytes = uploadId.toByteArray(); + checkUploadId(uploadIdBytes); + final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper(); String key = s3a.pathToKey(filePath); - byte[] uploadIdBytes = uploadId.toByteArray(); String uploadIdString = new String(uploadIdBytes, 0, uploadIdBytes.length, Charsets.UTF_8); - AbortMultipartUploadRequest request = new AbortMultipartUploadRequest(s3a - .getBucket(), key, uploadIdString); - LOG.debug("Abort request: {}", request); - s3a.getAmazonS3Client().abortMultipartUpload(request); + writeHelper.abortMultipartCommit(key, uploadIdString); } /** @@ -141,10 +148,64 @@ public class S3AMultipartUploader extends MultipartUploader { @Override protected MultipartUploader createMultipartUploader(FileSystem fs, Configuration conf) { - if (fs.getScheme().equals("s3a")) { + if (FS_S3A.equals(fs.getScheme())) { return new S3AMultipartUploader(fs, conf); } return null; } } + + private void checkUploadId(byte[] uploadId) throws IllegalArgumentException { + Preconditions.checkArgument(uploadId.length > 0, + "Empty UploadId is not valid"); + } + + /** + * Build the payload for marshalling. + * @param eTag upload etag + * @param len length + * @return a byte array to marshall. + * @throws IOException error writing the payload + */ + @VisibleForTesting + static byte[] buildPartHandlePayload(String eTag, long len) + throws IOException { + Preconditions.checkArgument(StringUtils.isNotEmpty(eTag), + "Empty etag"); + Preconditions.checkArgument(len > 0, + "Invalid length"); + + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + try(DataOutputStream output = new DataOutputStream(bytes)) { + output.writeUTF(HEADER); + output.writeLong(len); + output.writeUTF(eTag); + } + return bytes.toByteArray(); + } + + /** + * Parse the payload marshalled as a part handle. + * @param data handle data + * @return the length and etag + * @throws IOException error reading the payload + */ + static Pair<Long, String> parsePartHandlePayload(byte[] data) + throws IOException { + + try(DataInputStream input = + new DataInputStream(new ByteArrayInputStream(data))) { + final String header = input.readUTF(); + if (!HEADER.equals(header)) { + throw new IOException("Wrong header string: \"" + header + "\""); + } + final long len = input.readLong(); + final String etag = input.readUTF(); + if (len <= 0) { + throw new IOException("Negative length"); + } + return Pair.of(len, etag); + } + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java index 46ca65c..a85a87f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java @@ -219,6 +219,10 @@ public class WriteOperationHelper { List<PartETag> partETags, long length, Retried retrying) throws IOException { + if (partETags.isEmpty()) { + throw new IOException( + "No upload parts in multipart upload to " + destKey); + } return invoker.retry("Completing multipart commit", destKey, true, retrying, http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-tools/hadoop-aws/src/main/resources/META-INF/org.apache.hadoop.fs.MultipartUploaderFactory ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/resources/META-INF/org.apache.hadoop.fs.MultipartUploaderFactory b/hadoop-tools/hadoop-aws/src/main/resources/META-INF/org.apache.hadoop.fs.MultipartUploaderFactory deleted file mode 100644 index 2e4bc24..0000000 --- a/hadoop-tools/hadoop-aws/src/main/resources/META-INF/org.apache.hadoop.fs.MultipartUploaderFactory +++ /dev/null @@ -1,15 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -org.apache.hadoop.fs.s3a.S3AMultipartUploader$Factory http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory b/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory new file mode 100644 index 0000000..2e4bc24 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.hadoop.fs.s3a.S3AMultipartUploader$Factory http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java new file mode 100644 index 0000000..d28f39b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.contract.s3a; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.WriteOperationHelper; + +import static org.apache.hadoop.fs.s3a.S3ATestConstants.*; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; +import static org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles.DEFAULT_HUGE_PARTITION_SIZE; + +/** + * Test MultipartUploader with S3A. + */ +public class ITestS3AContractMultipartUploader extends + AbstractContractMultipartUploaderTest { + + private static final Logger LOG = + LoggerFactory.getLogger(ITestS3AContractMultipartUploader.class); + + private int partitionSize; + + /** + * S3 requires a minimum part size of 5MB (except the last part). + * @return 5MB + */ + @Override + protected int partSizeInBytes() { + return partitionSize; + } + + @Override + protected int getTestPayloadCount() { + return 3; + } + + @Override + public S3AFileSystem getFileSystem() { + return (S3AFileSystem) super.getFileSystem(); + } + + /** + * Create a configuration, possibly patching in S3Guard options. + * @return a configuration + */ + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + maybeEnableS3Guard(conf); + return conf; + } + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); + } + + @Override + public void setup() throws Exception { + super.setup(); + Configuration conf = getContract().getConf(); + boolean enabled = getTestPropertyBool( + conf, + KEY_SCALE_TESTS_ENABLED, + DEFAULT_SCALE_TESTS_ENABLED); + assume("Scale test disabled: to enable set property " + + KEY_SCALE_TESTS_ENABLED, + enabled); + partitionSize = (int) getTestPropertyBytes(conf, + KEY_HUGE_PARTITION_SIZE, + DEFAULT_HUGE_PARTITION_SIZE); + } + + /** + * Extend superclass teardown with actions to help clean up the S3 store, + * including aborting uploads under the test path. + */ + @Override + public void teardown() throws Exception { + Path teardown = path("teardown").getParent(); + S3AFileSystem fs = getFileSystem(); + WriteOperationHelper helper = fs.getWriteOperationHelper(); + try { + LOG.info("Teardown: aborting outstanding uploads under {}", teardown); + int count = helper.abortMultipartUploadsUnderPath(fs.pathToKey(teardown)); + LOG.info("Found {} incomplete uploads", count); + } catch (IOException e) { + LOG.warn("IOE in teardown", e); + } + super.teardown(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java index 0f7b418..ce2a98e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java @@ -106,6 +106,11 @@ public interface S3ATestConstants { String KEY_HUGE_PARTITION_SIZE = S3A_SCALE_TEST + "huge.partitionsize"; /** + * Size of partitions to upload: {@value}. + */ + String DEFAULT_HUGE_PARTITION_SIZE = "8M"; + + /** * The default huge size is small âfull 5GB+ scale tests are something * to run in long test runs on EC2 VMs. {@value}. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java new file mode 100644 index 0000000..35d0460 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.EOFException; +import java.io.IOException; + +import org.junit.Test; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.test.HadoopTestBase; + +import static org.apache.hadoop.fs.s3a.S3AMultipartUploader.*; +import static org.apache.hadoop.fs.s3a.S3AMultipartUploader.parsePartHandlePayload; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Test multipart upload support methods and classes. + */ +public class TestS3AMultipartUploaderSupport extends HadoopTestBase { + + @Test + public void testRoundTrip() throws Throwable { + Pair<Long, String> result = roundTrip("tag", 1); + assertEquals("tag", result.getRight()); + assertEquals(1, result.getLeft().longValue()); + } + + @Test + public void testRoundTrip2() throws Throwable { + long len = 1L + Integer.MAX_VALUE; + Pair<Long, String> result = roundTrip("11223344", + len); + assertEquals("11223344", result.getRight()); + assertEquals(len, result.getLeft().longValue()); + } + + @Test + public void testNoEtag() throws Throwable { + intercept(IllegalArgumentException.class, + () -> buildPartHandlePayload("", 1)); + } + + @Test + public void testNoLen() throws Throwable { + intercept(IllegalArgumentException.class, + () -> buildPartHandlePayload("tag", 0)); + } + + @Test + public void testBadPayload() throws Throwable { + intercept(EOFException.class, + () -> parsePartHandlePayload(new byte[0])); + } + + @Test + public void testBadHeader() throws Throwable { + byte[] bytes = buildPartHandlePayload("tag", 1); + bytes[2]='f'; + intercept(IOException.class, "header", + () -> parsePartHandlePayload(bytes)); + } + + private Pair<Long, String> roundTrip(final String tag, final long len) throws IOException { + byte[] bytes = buildPartHandlePayload(tag, len); + return parsePartHandlePayload(bytes); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java index 4df3912..55e4dc7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java @@ -83,7 +83,9 @@ public class TestStagingPartitionedJobCommit commit.setDestinationKey(key); commit.setUri("s3a://" + BUCKET + "/" + key); commit.setUploadId(UUID.randomUUID().toString()); - commit.setEtags(new ArrayList<>()); + ArrayList<String> etags = new ArrayList<>(); + etags.add("tag1"); + commit.setEtags(etags); pending.add(commit); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java index 02236eb..88a19d5 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java @@ -64,7 +64,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { private static final Logger LOG = LoggerFactory.getLogger( AbstractSTestS3AHugeFiles.class); public static final int DEFAULT_UPLOAD_BLOCKSIZE = 64 * _1KB; - public static final String DEFAULT_PARTITION_SIZE = "8M"; + private Path scaleTestDir; private Path hugefile; private Path hugefileRenamed; @@ -101,7 +101,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { Configuration conf = super.createScaleConfiguration(); partitionSize = (int) getTestPropertyBytes(conf, KEY_HUGE_PARTITION_SIZE, - DEFAULT_PARTITION_SIZE); + DEFAULT_HUGE_PARTITION_SIZE); assertTrue("Partition size too small: " + partitionSize, partitionSize > MULTIPART_MIN_SIZE); conf.setLong(SOCKET_SEND_BUFFER, _1MB); http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ec97abb/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml b/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml index fe0af66..ec4c54a 100644 --- a/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml +++ b/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml @@ -108,6 +108,11 @@ </property> <property> + <name>fs.contract.supports-multipartuploader</name> + <value>true</value> + </property> + + <property> <name>fs.contract.supports-unix-permissions</name> <value>false</value> </property> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
