This is an automated email from the ASF dual-hosted git repository. stevel pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new d2e861dcb5a HADOOP-15224. S3A: Add option to set checksum on S3 objects (#7396) (#7550) d2e861dcb5a is described below commit d2e861dcb5aebd8da12853e2e53d0ee50508fe01 Author: Raphael Azzolini <azzol...@amazon.com> AuthorDate: Fri Apr 4 09:17:46 2025 -0700 HADOOP-15224. S3A: Add option to set checksum on S3 objects (#7396) (#7550) Add the property fs.s3a.checksum.algorithm that allow users to specify a checksum algorithm (CRC32, CRC32C, SHA1, or SHA256) to be used by the AWS SDK to generate the checksum for object integrity check. Contributed by Raphael Azzolini --- .../java/org/apache/hadoop/fs/s3a/Constants.java | 9 ++ .../apache/hadoop/fs/s3a/S3ABlockOutputStream.java | 4 + .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 2 + .../fs/s3a/commit/files/SinglePendingCommit.java | 19 +-- .../hadoop/fs/s3a/commit/files/UploadEtag.java | 136 +++++++++++++++++ .../fs/s3a/commit/impl/CommitOperations.java | 9 +- .../apache/hadoop/fs/s3a/impl/ChecksumSupport.java | 71 +++++++++ .../hadoop/fs/s3a/impl/RequestFactoryImpl.java | 50 ++++++ .../hadoop/fs/s3a/impl/S3AMultipartUploader.java | 77 +++++++++- .../src/site/markdown/tools/hadoop-aws/index.md | 9 ++ .../tools/hadoop-aws/troubleshooting_s3a.md | 22 +++ .../org/apache/hadoop/fs/s3a/ITestS3AChecksum.java | 120 +++++++++++++++ .../apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java | 18 ++- .../hadoop/fs/s3a/auth/ITestCustomSigner.java | 5 + .../fs/s3a/commit/ITestCommitOperations.java | 7 +- .../hadoop/fs/s3a/commit/files/TestUploadEtag.java | 169 +++++++++++++++++++++ .../s3a/commit/staging/TestStagingCommitter.java | 2 +- .../staging/TestStagingPartitionedJobCommit.java | 7 +- .../hadoop/fs/s3a/impl/TestChecksumSupport.java | 76 +++++++++ .../hadoop/fs/s3a/impl/TestRequestFactory.java | 84 +++++++++- .../s3a/impl/TestS3AMultipartUploaderSupport.java | 123 ++++++++++++++- 21 files changed, 983 insertions(+), 36 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index d0cfd777813..2c479d1ddc2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1764,6 +1764,15 @@ private Constants() { */ public static final boolean CHECKSUM_VALIDATION_DEFAULT = false; + /** + * Indicates the algorithm used to create the checksum for the object + * to be uploaded to S3. Unset by default. It supports the following values: + * 'CRC32', 'CRC32C', 'SHA1', and 'SHA256' + * value:{@value} + */ + public static final String CHECKSUM_ALGORITHM = + "fs.s3a.create.checksum.algorithm"; + /** * Are extensions classes, such as {@code fs.s3a.aws.credentials.provider}, * going to be loaded from the same classloader that loaded diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 7b249a11c07..db1134dc5a2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -1064,6 +1064,10 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block, return CompletedPart.builder() .eTag(response.eTag()) .partNumber(currentPartNumber) + .checksumCRC32(response.checksumCRC32()) + .checksumCRC32C(response.checksumCRC32C()) + .checksumSHA1(response.checksumSHA1()) + .checksumSHA256(response.checksumSHA256()) .build(); } catch (Exception e) { final IOException ex = e instanceof IOException diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 46a0d9da56e..4181fd65c97 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -118,6 +118,7 @@ import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperationCallbacksImpl; import org.apache.hadoop.fs.s3a.impl.CSES3AFileSystemOperations; import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy; +import org.apache.hadoop.fs.s3a.impl.ChecksumSupport; import org.apache.hadoop.fs.s3a.impl.ClientManager; import org.apache.hadoop.fs.s3a.impl.ClientManagerImpl; import org.apache.hadoop.fs.s3a.impl.ConfigurationHelper; @@ -1336,6 +1337,7 @@ protected RequestFactory createRequestFactory() { .withStorageClass(storageClass) .withMultipartUploadEnabled(isMultipartUploadEnabled) .withPartUploadTimeout(partUploadTimeout) + .withChecksumAlgorithm(ChecksumSupport.getChecksumAlgorithm(getConf())) .build(); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java index e4541ba4da3..aaa91bc757c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java @@ -71,7 +71,7 @@ @InterfaceAudience.Private @InterfaceStability.Unstable public class SinglePendingCommit extends PersistentCommitData<SinglePendingCommit> - implements Iterable<String> { + implements Iterable<UploadEtag> { /** * Serialization ID: {@value}. @@ -118,7 +118,7 @@ public class SinglePendingCommit extends PersistentCommitData<SinglePendingCommi private String text = ""; /** Ordered list of etags. */ - private List<String> etags; + private List<UploadEtag> etags; /** * Any custom extra data committer subclasses may choose to add. @@ -222,7 +222,7 @@ public void bindCommitData(List<CompletedPart> parts) throws ValidationFailure { for (CompletedPart part : parts) { verify(part.partNumber() == counter, "Expected part number %s but got %s", counter, part.partNumber()); - etags.add(part.eTag()); + etags.add(UploadEtag.fromCompletedPart(part)); counter++; } } @@ -237,9 +237,10 @@ public void validate() throws ValidationFailure { verify(length >= 0, "Invalid length: " + length); destinationPath(); verify(etags != null, "No etag list"); - validateCollectionClass(etags, String.class); - for (String etag : etags) { - verify(StringUtils.isNotEmpty(etag), "Empty etag"); + validateCollectionClass(etags, UploadEtag.class); + for (UploadEtag etag : etags) { + verify(etag != null && StringUtils.isNotEmpty(etag.getEtag()), + "Empty etag"); } if (extraData != null) { validateCollectionClass(extraData.keySet(), String.class); @@ -313,7 +314,7 @@ public int getPartCount() { * @return an iterator. */ @Override - public Iterator<String> iterator() { + public Iterator<UploadEtag> iterator() { return etags.iterator(); } @@ -442,11 +443,11 @@ public void setText(String text) { } /** @return ordered list of etags. */ - public List<String> getEtags() { + public List<UploadEtag> getEtags() { return etags; } - public void setEtags(List<String> etags) { + public void setEtags(List<UploadEtag> etags) { this.etags = etags; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/UploadEtag.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/UploadEtag.java new file mode 100644 index 00000000000..79a8956810f --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/UploadEtag.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.files; + +import java.io.Serializable; +import java.util.StringJoiner; + +import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm; +import software.amazon.awssdk.services.s3.model.CompletedPart; + +/** + * Stores ETag and checksum values from {@link CompletedPart} responses from S3. + * These values need to be stored to be later passed to the + * {@link software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest + * CompleteMultipartUploadRequest} + */ +public class UploadEtag implements Serializable { + + /** + * Serialization ID: {@value}. + */ + private static final long serialVersionUID = 1L; + + private String etag; + private String checksumAlgorithm; + private String checksum; + + public UploadEtag() { + } + + public UploadEtag(String etag, String checksumAlgorithm, String checksum) { + this.etag = etag; + this.checksumAlgorithm = checksumAlgorithm; + this.checksum = checksum; + } + + public String getEtag() { + return etag; + } + + public void setEtag(String etag) { + this.etag = etag; + } + + public String getChecksumAlgorithm() { + return checksumAlgorithm; + } + + public void setChecksumAlgorithm(String checksumAlgorithm) { + this.checksumAlgorithm = checksumAlgorithm; + } + + public String getChecksum() { + return checksum; + } + + public void setChecksum(String checksum) { + this.checksum = checksum; + } + + public static UploadEtag fromCompletedPart(CompletedPart completedPart) { + UploadEtag uploadEtag = new UploadEtag(); + uploadEtag.setEtag(completedPart.eTag()); + if (completedPart.checksumCRC32() != null) { + uploadEtag.setChecksumAlgorithm(ChecksumAlgorithm.CRC32.toString()); + uploadEtag.setChecksum(completedPart.checksumCRC32()); + } + if (completedPart.checksumCRC32C() != null) { + uploadEtag.setChecksumAlgorithm(ChecksumAlgorithm.CRC32_C.toString()); + uploadEtag.setChecksum(completedPart.checksumCRC32C()); + } + if (completedPart.checksumSHA1() != null) { + uploadEtag.setChecksumAlgorithm(ChecksumAlgorithm.SHA1.toString()); + uploadEtag.setChecksum(completedPart.checksumSHA1()); + } + if (completedPart.checksumSHA256() != null) { + uploadEtag.setChecksumAlgorithm(ChecksumAlgorithm.SHA256.toString()); + uploadEtag.setChecksum(completedPart.checksumSHA256()); + } + return uploadEtag; + } + + public static CompletedPart toCompletedPart(UploadEtag uploadEtag, int partNumber) { + final CompletedPart.Builder builder = CompletedPart.builder() + .partNumber(partNumber) + .eTag(uploadEtag.etag); + if (uploadEtag.checksumAlgorithm == null) { + return builder.build(); + } + final ChecksumAlgorithm checksumAlgorithm = ChecksumAlgorithm.fromValue( + uploadEtag.checksumAlgorithm); + switch (checksumAlgorithm) { + case CRC32: + builder.checksumCRC32(uploadEtag.checksum); + break; + case CRC32_C: + builder.checksumCRC32C(uploadEtag.checksum); + break; + case SHA1: + builder.checksumSHA1(uploadEtag.checksum); + break; + case SHA256: + builder.checksumSHA256(uploadEtag.checksum); + break; + default: + // do nothing + } + return builder.build(); + } + + @Override + public String toString() { + return new StringJoiner(", ", UploadEtag.class.getSimpleName() + "[", "]") + .add("serialVersionUID='" + serialVersionUID + "'") + .add("etag='" + etag + "'") + .add("checksumAlgorithm='" + checksumAlgorithm + "'") + .add("checksum='" + checksum + "'") + .toString(); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java index 14bd4cc2f7d..c73b07ccf75 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java @@ -53,6 +53,7 @@ import org.apache.hadoop.fs.s3a.WriteOperations; import org.apache.hadoop.fs.s3a.commit.CommitConstants; import org.apache.hadoop.fs.s3a.commit.PathCommitException; +import org.apache.hadoop.fs.s3a.commit.files.UploadEtag; import org.apache.hadoop.fs.s3a.commit.files.PendingSet; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.fs.s3a.commit.files.SuccessData; @@ -165,9 +166,9 @@ public CommitOperations(S3AFileSystem fs, * @param tagIds list of tags * @return same list, now in numbered tuples */ - public static List<CompletedPart> toPartEtags(List<String> tagIds) { + public static List<CompletedPart> toPartEtags(List<UploadEtag> tagIds) { return IntStream.range(0, tagIds.size()) - .mapToObj(i -> CompletedPart.builder().partNumber(i + 1).eTag(tagIds.get(i)).build()) + .mapToObj(i -> UploadEtag.toCompletedPart(tagIds.get(i), i + 1)) .collect(Collectors.toList()); } @@ -655,6 +656,10 @@ private List<CompletedPart> uploadFileData( parts.add(CompletedPart.builder() .partNumber(partNumber) .eTag(response.eTag()) + .checksumCRC32(response.checksumCRC32()) + .checksumCRC32C(response.checksumCRC32C()) + .checksumSHA1(response.checksumSHA1()) + .checksumSHA256(response.checksumSHA256()) .build()); } return parts; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChecksumSupport.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChecksumSupport.java new file mode 100644 index 00000000000..b14f5f7bd23 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChecksumSupport.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.util.Set; + +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; +import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ConfigurationHelper; + +import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_ALGORITHM; + +/** + * Utility class to support operations on S3 object checksum. + */ +public final class ChecksumSupport { + + private ChecksumSupport() { + } + + /** + * Checksum algorithms that are supported by S3A. + */ + private static final Set<ChecksumAlgorithm> SUPPORTED_CHECKSUM_ALGORITHMS = ImmutableSet.of( + ChecksumAlgorithm.CRC32, + ChecksumAlgorithm.CRC32_C, + ChecksumAlgorithm.SHA1, + ChecksumAlgorithm.SHA256); + + /** + * Get the checksum algorithm to be used for data integrity check of the objects in S3. + * This operation includes validating if the provided value is a supported checksum algorithm. + * @param conf configuration to scan + * @return the checksum algorithm to be passed on S3 requests + * @throws IllegalArgumentException if the checksum algorithm is not known or not supported + */ + public static ChecksumAlgorithm getChecksumAlgorithm(Configuration conf) { + return ConfigurationHelper.resolveEnum(conf, + CHECKSUM_ALGORITHM, + ChecksumAlgorithm.class, + configValue -> { + if (StringUtils.isBlank(configValue)) { + return null; + } + if (ChecksumAlgorithm.CRC32_C.toString().equalsIgnoreCase(configValue)) { + // In case the configuration value is CRC32C, without underscore. + return ChecksumAlgorithm.CRC32_C; + } + throw new IllegalArgumentException("Checksum algorithm is not supported: " + configValue); + }); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index 00d9368aa58..f03b83764b8 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -27,6 +27,7 @@ import software.amazon.awssdk.core.SdkRequest; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; import software.amazon.awssdk.services.s3.model.CompletedPart; @@ -62,6 +63,7 @@ import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT; +import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_C; import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM; import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.setRequestTimeout; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT; @@ -138,6 +140,11 @@ public class RequestFactoryImpl implements RequestFactory { */ private final Duration partUploadTimeout; + /** + * Indicates the algorithm used to create the checksum for the object to be uploaded to S3. + */ + private final ChecksumAlgorithm checksumAlgorithm; + /** * Constructor. * @param builder builder with all the configuration. @@ -153,6 +160,7 @@ protected RequestFactoryImpl( this.storageClass = builder.storageClass; this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled; this.partUploadTimeout = builder.partUploadTimeout; + this.checksumAlgorithm = builder.checksumAlgorithm; } /** @@ -235,6 +243,10 @@ private CopyObjectRequest.Builder buildCopyObjectRequest() { copyObjectRequestBuilder.contentEncoding(contentEncoding); } + if (checksumAlgorithm != null) { + copyObjectRequestBuilder.checksumAlgorithm(checksumAlgorithm); + } + return copyObjectRequestBuilder; } @@ -377,6 +389,10 @@ private PutObjectRequest.Builder buildPutObjectRequest(long length, boolean isDi putObjectRequestBuilder.contentEncoding(contentEncoding); } + if (checksumAlgorithm != null) { + putObjectRequestBuilder.checksumAlgorithm(checksumAlgorithm); + } + return putObjectRequestBuilder; } @@ -526,6 +542,10 @@ public CreateMultipartUploadRequest.Builder newMultipartUploadRequestBuilder( requestBuilder.storageClass(storageClass); } + if (checksumAlgorithm != null) { + requestBuilder.checksumAlgorithm(checksumAlgorithm); + } + return prepareRequest(requestBuilder); } @@ -539,6 +559,16 @@ public CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestB CompleteMultipartUploadRequest.Builder requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) .multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build()); + // Correct SSE-C request parameters are required for this request when + // specifying checksums for each part + if (checksumAlgorithm != null && getServerSideEncryptionAlgorithm() == SSE_C) { + EncryptionSecretOperations.getSSECustomerKey(encryptionSecrets) + .ifPresent(base64customerKey -> requestBuilder + .sseCustomerAlgorithm(ServerSideEncryption.AES256.name()) + .sseCustomerKey(base64customerKey) + .sseCustomerKeyMD5( + Md5Utils.md5AsBase64(Base64.getDecoder().decode(base64customerKey)))); + } return prepareRequest(requestBuilder); } @@ -618,6 +648,11 @@ public UploadPartRequest.Builder newUploadPartRequestBuilder( // Set the request timeout for the part upload setRequestTimeout(builder, partUploadTimeout); + + if (checksumAlgorithm != null) { + builder.checksumAlgorithm(checksumAlgorithm); + } + return prepareRequest(builder); } @@ -732,6 +767,11 @@ public static final class RequestFactoryBuilder { */ private Duration partUploadTimeout = DEFAULT_PART_UPLOAD_TIMEOUT; + /** + * Indicates the algorithm used to create the checksum for the object to be uploaded to S3. + */ + private ChecksumAlgorithm checksumAlgorithm; + private RequestFactoryBuilder() { } @@ -841,6 +881,16 @@ public RequestFactoryBuilder withPartUploadTimeout(final Duration value) { partUploadTimeout = value; return this; } + + /** + * Indicates the algorithm used to create the checksum for the object to be uploaded to S3. + * @param value new value + * @return the builder + */ + public RequestFactoryBuilder withChecksumAlgorithm(final ChecksumAlgorithm value) { + checksumAlgorithm = value; + return this; + } } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java index e00319c3dc5..17c331d1ccf 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java @@ -26,6 +26,7 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Comparator; import java.util.HashSet; @@ -54,6 +55,7 @@ import org.apache.hadoop.fs.UploadHandle; import org.apache.hadoop.fs.impl.AbstractMultipartUploader; import org.apache.hadoop.fs.s3a.WriteOperations; +import org.apache.hadoop.fs.s3a.commit.files.UploadEtag; import org.apache.hadoop.fs.s3a.statistics.S3AMultipartUploaderStatistics; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.util.Preconditions; @@ -160,6 +162,13 @@ public CompletableFuture<PartHandle> putPart( UploadPartResponse response = writeOperations.uploadPart(request, body, statistics); statistics.partPut(lengthInBytes); String eTag = response.eTag(); + String checksumAlgorithm = null; + String checksum = null; + final Map.Entry<String, String> extractedChecksum = extractChecksum(response); + if (extractedChecksum != null) { + checksumAlgorithm = extractedChecksum.getKey(); + checksum = extractedChecksum.getValue(); + } return BBPartHandle.from( ByteBuffer.wrap( buildPartHandlePayload( @@ -167,7 +176,9 @@ public CompletableFuture<PartHandle> putPart( uploadIdString, partNumber, eTag, - lengthInBytes))); + lengthInBytes, + checksumAlgorithm, + checksum))); }); } @@ -203,8 +214,9 @@ public CompletableFuture<PathHandle> complete( payload.validate(uploadIdStr, filePath); ids.add(payload.getPartNumber()); totalLength += payload.getLen(); - eTags.add( - CompletedPart.builder().partNumber(handle.getKey()).eTag(payload.getEtag()).build()); + final UploadEtag uploadEtag = new UploadEtag(payload.getEtag(), + payload.getChecksumAlgorithm(), payload.getChecksum()); + eTags.add(UploadEtag.toCompletedPart(uploadEtag, handle.getKey())); } Preconditions.checkArgument(ids.size() == count, "Duplicate PartHandles"); @@ -270,6 +282,8 @@ public CompletableFuture<Integer> abortUploadsUnderPath(final Path path) * @param partNumber part number from response * @param etag upload etag * @param len length + * @param checksumAlgorithm checksum algorithm + * @param checksum checksum content * @return a byte array to marshall. * @throws IOException error writing the payload */ @@ -279,10 +293,12 @@ static byte[] buildPartHandlePayload( final String uploadId, final int partNumber, final String etag, - final long len) + final long len, + final String checksumAlgorithm, + final String checksum) throws IOException { - return new PartHandlePayload(path, uploadId, partNumber, len, etag) + return new PartHandlePayload(path, uploadId, partNumber, len, etag, checksumAlgorithm, checksum) .toBytes(); } @@ -308,11 +324,34 @@ static PartHandlePayload parsePartHandlePayload( final int partNumber = input.readInt(); final long len = input.readLong(); final String etag = input.readUTF(); + String checksumAlgorithm = null; + String checksum = null; + if (input.available() > 0) { + checksumAlgorithm = input.readUTF(); + checksum = input.readUTF(); + } if (len < 0) { throw new IOException("Negative length"); } - return new PartHandlePayload(path, uploadId, partNumber, len, etag); + return new PartHandlePayload(path, uploadId, partNumber, len, etag, checksumAlgorithm, + checksum); + } + } + + static Map.Entry<String, String> extractChecksum(final UploadPartResponse uploadPartResponse) { + if (uploadPartResponse.checksumCRC32() != null) { + return new AbstractMap.SimpleEntry<>("CRC32", uploadPartResponse.checksumCRC32()); + } + if (uploadPartResponse.checksumCRC32C() != null) { + return new AbstractMap.SimpleEntry<>("CRC32C", uploadPartResponse.checksumCRC32C()); + } + if (uploadPartResponse.checksumSHA1() != null) { + return new AbstractMap.SimpleEntry<>("SHA1", uploadPartResponse.checksumSHA1()); + } + if (uploadPartResponse.checksumSHA256() != null) { + return new AbstractMap.SimpleEntry<>("SHA256", uploadPartResponse.checksumSHA256()); } + return null; } /** @@ -332,12 +371,18 @@ static final class PartHandlePayload { private final String etag; + private final String checksumAlgorithm; + + private final String checksum; + private PartHandlePayload( final String path, final String uploadId, final int partNumber, final long len, - final String etag) { + final String etag, + final String checksumAlgorithm, + final String checksum) { Preconditions.checkArgument(StringUtils.isNotEmpty(etag), "Empty etag"); Preconditions.checkArgument(StringUtils.isNotEmpty(path), @@ -346,12 +391,18 @@ private PartHandlePayload( "Empty uploadId"); Preconditions.checkArgument(len >= 0, "Invalid length"); + Preconditions.checkArgument((StringUtils.isNotEmpty(checksumAlgorithm) && + StringUtils.isNotEmpty(checksum)) || + (StringUtils.isEmpty(checksumAlgorithm) && StringUtils.isEmpty(checksum)), + "Checksum algorithm and checksum should be both provided or empty"); this.path = path; this.uploadId = uploadId; this.partNumber = partNumber; this.len = len; this.etag = etag; + this.checksumAlgorithm = checksumAlgorithm; + this.checksum = checksum; } public String getPath() { @@ -374,6 +425,14 @@ public String getUploadId() { return uploadId; } + public String getChecksumAlgorithm() { + return checksumAlgorithm; + } + + public String getChecksum() { + return checksum; + } + public byte[] toBytes() throws IOException { Preconditions.checkArgument(StringUtils.isNotEmpty(etag), @@ -389,6 +448,10 @@ public byte[] toBytes() output.writeInt(partNumber); output.writeLong(len); output.writeUTF(etag); + if (checksumAlgorithm != null && checksum != null) { + output.writeUTF(checksumAlgorithm); + output.writeUTF(checksum); + } } return bytes.toByteArray(); } diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md index 59c1ceac1c5..afed3397f56 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md @@ -1307,6 +1307,15 @@ Here are some the S3A properties for use in production. </description> </property> +<property> + <name>fs.s3a.create.checksum.algorithm</name> + <description> + Indicates the algorithm used to create the checksum for the object + to be uploaded to S3. Unset by default. It supports the following values: + 'CRC32', 'CRC32C', 'SHA1', and 'SHA256' + </description> +</property> + <!-- The switch to turn S3A auditing on or off. --> diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md index 1bdb2c66cb4..6520e0dc026 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md @@ -387,6 +387,28 @@ Happens if a multipart upload is being completed, but one of the parts is missin * A magic committer job's list of in-progress uploads somehow got corrupted * Bug in the S3A codebase (rare, but not impossible...) +### <a name="object_lock_parameters"></a> Status Code 400 "Content-MD5 OR x-amz-checksum- HTTP header is required for Put Object requests with Object Lock parameters" +``` +software.amazon.awssdk.services.s3.model.S3Exception: Content-MD5 OR x-amz-checksum- HTTP header is required for Put Object requests with Object Lock parameters (Service: S3, Status Code: 400, Request ID: 1122334455, Extended Request ID: ...): +InvalidRequest: Content-MD5 OR x-amz-checksum- HTTP header is required for Put Object requests with Object Lock parameters (Service: S3, Status Code: 400, Request ID: 1122334455, Extended Request ID: ...) +``` + +This error happens if the S3 bucket has [Object Lock](https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lock.html) enabled. + +The Content-MD5 or x-amz-sdk-checksum-algorithm header is required for any request to upload an object +with a retention period configured using Object Lock. + +If Object Lock can't be disabled in the S3 bucket, set a checksum algorithm to be used in the +uploads via the `fs.s3a.create.checksum.algorithm` property. Note that enabling checksum on uploads can +affect the performance. + +```xml +<property> + <name>fs.s3a.create.checksum.algorithm</name> + <value>SHA256</value> +</property> +``` + ## <a name="access_denied"></a> Access Denied HTTP error codes 401 and 403 are mapped to `AccessDeniedException` in the S3A connector. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AChecksum.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AChecksum.java new file mode 100644 index 00000000000..f477f46ceb6 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AChecksum.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm; +import software.amazon.awssdk.services.s3.model.ChecksumMode; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.impl.ChecksumSupport; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.rm; +import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_ALGORITHM; +import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REJECT_OUT_OF_SPAN_OPERATIONS; + +/** + * Tests S3 checksum algorithm. + * If CHECKSUM_ALGORITHM config is not set in auth-keys.xml, + * SHA256 algorithm will be picked. + */ +public class ITestS3AChecksum extends AbstractS3ATestBase { + + private static final ChecksumAlgorithm DEFAULT_CHECKSUM_ALGORITHM = ChecksumAlgorithm.SHA256; + + private ChecksumAlgorithm checksumAlgorithm; + + private static final int[] SIZES = { + 1, 2, 3, 4, 5, 254, 255, 256, 257, 2 ^ 12 - 1 + }; + + @Override + protected Configuration createConfiguration() { + final Configuration conf = super.createConfiguration(); + S3ATestUtils.removeBaseAndBucketOverrides(conf, + CHECKSUM_ALGORITHM, + REJECT_OUT_OF_SPAN_OPERATIONS); + S3ATestUtils.disableFilesystemCaching(conf); + checksumAlgorithm = ChecksumSupport.getChecksumAlgorithm(conf); + if (checksumAlgorithm == null) { + checksumAlgorithm = DEFAULT_CHECKSUM_ALGORITHM; + LOG.info("No checksum algorithm found in configuration, will use default {}", + checksumAlgorithm); + conf.set(CHECKSUM_ALGORITHM, checksumAlgorithm.toString()); + } + conf.setBoolean(REJECT_OUT_OF_SPAN_OPERATIONS, false); + return conf; + } + + @Test + public void testChecksum() throws IOException { + for (int size : SIZES) { + validateChecksumForFilesize(size); + } + } + + private void validateChecksumForFilesize(int len) throws IOException { + describe("Create a file of size " + len); + String src = String.format("%s-%04x", methodName.getMethodName(), len); + Path path = writeThenReadFile(src, len); + assertChecksum(path); + rm(getFileSystem(), path, false, false); + } + + private void assertChecksum(Path path) throws IOException { + final String key = getFileSystem().pathToKey(path); + HeadObjectRequest.Builder requestBuilder = getFileSystem().getRequestFactory() + .newHeadObjectRequestBuilder(key) + .checksumMode(ChecksumMode.ENABLED); + HeadObjectResponse headObject = getFileSystem().getS3AInternals() + .getAmazonS3Client("Call head object with checksum enabled") + .headObject(requestBuilder.build()); + switch (checksumAlgorithm) { + case CRC32: + Assertions.assertThat(headObject.checksumCRC32()) + .describedAs("headObject.checksumCRC32()") + .isNotNull(); + break; + case CRC32_C: + Assertions.assertThat(headObject.checksumCRC32C()) + .describedAs("headObject.checksumCRC32C()") + .isNotNull(); + break; + case SHA1: + Assertions.assertThat(headObject.checksumSHA1()) + .describedAs("headObject.checksumSHA1()") + .isNotNull(); + break; + case SHA256: + Assertions.assertThat(headObject.checksumSHA256()) + .describedAs("headObject.checksumSHA256()") + .isNotNull(); + break; + default: + fail("Checksum algorithm not supported: " + checksumAlgorithm); + } + } + +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java index 28a443f04cd..5b8e1dc4300 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java @@ -19,7 +19,7 @@ package org.apache.hadoop.fs.s3a; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A; -import static org.junit.Assert.assertEquals; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.when; @@ -27,6 +27,7 @@ import java.net.URI; import java.util.Date; +import org.assertj.core.api.Assertions; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.HeadObjectRequest; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; @@ -86,7 +87,20 @@ public void testDeleteOnExit() throws Exception { testFs.deleteOnExit(path); testFs.close(); - assertEquals(0, testFs.getDeleteOnDnExitCount()); + Assertions.assertThat(testFs.getDeleteOnDnExitCount()).isEqualTo(0); + } + + @Test + public void testCreateRequestFactoryWithInvalidChecksumAlgorithm() throws Exception { + Configuration conf = createConfiguration(); + conf.set(Constants.CHECKSUM_ALGORITHM, "INVALID"); + TestS3AFileSystem testFs = new TestS3AFileSystem(); + URI uri = URI.create(FS_S3A + "://" + BUCKET); + final IllegalArgumentException exception = intercept(IllegalArgumentException.class, + () -> testFs.initialize(uri, conf)); + Assertions.assertThat(exception.getMessage()) + .describedAs("Error message should say that INVALID is not supported") + .isEqualTo("Checksum algorithm is not supported: INVALID"); } private ArgumentMatcher<HeadObjectRequest> correctGetMetadataRequest( diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java index 58bb2a5e491..e6eb60ad061 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java @@ -217,6 +217,11 @@ private Configuration createTestConfig(String identifier) { conf.set(TEST_ID_KEY, identifier); conf.set(TEST_REGION_KEY, regionName); + // Having the checksum algorithm in this test causes + // x-amz-sdk-checksum-algorithm specified, but no corresponding + // x-amz-checksum-* or x-amz-trailer headers were found + conf.unset(Constants.CHECKSUM_ALGORITHM); + // make absolutely sure there is no caching. disableFilesystemCaching(conf); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java index 9e0bdd2cd34..34b856c21b7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.auth.ProgressCounter; +import org.apache.hadoop.fs.s3a.commit.files.UploadEtag; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.fs.s3a.commit.impl.CommitContext; import org.apache.hadoop.fs.s3a.commit.impl.CommitOperations; @@ -442,11 +443,11 @@ private Path validatePendingCommitData(String filename, Assertions.assertThat(persisted.getSaved()) .describedAs("saved timestamp in %s", persisted) .isGreaterThan(0); - List<String> etags = persisted.getEtags(); - Assertions.assertThat(etags) + List<UploadEtag> uploadEtags = persisted.getEtags(); + Assertions.assertThat(uploadEtags) .describedAs("Etag list") .hasSize(1); - Assertions.assertThat(CommitOperations.toPartEtags(etags)) + Assertions.assertThat(uploadEtags) .describedAs("Etags to parts") .hasSize(1); return pendingDataPath; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/files/TestUploadEtag.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/files/TestUploadEtag.java new file mode 100644 index 00000000000..e0d19591c40 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/files/TestUploadEtag.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.files; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import software.amazon.awssdk.services.s3.model.CompletedPart; + +public class TestUploadEtag { + + @Test + public void testFromCompletedPartCRC32() { + final CompletedPart completedPart = CompletedPart.builder() + .eTag("tag") + .checksumCRC32("checksum") + .build(); + final UploadEtag uploadEtag = UploadEtag.fromCompletedPart(completedPart); + Assertions.assertThat(uploadEtag.getEtag()) + .describedAs("Etag mismatch") + .isEqualTo("tag"); + Assertions.assertThat(uploadEtag.getChecksumAlgorithm()) + .describedAs("Checksum algorithm should be CRC32") + .isEqualTo("CRC32"); + Assertions.assertThat(uploadEtag.getChecksum()) + .describedAs("Checksum mismatch") + .isEqualTo("checksum"); + } + + @Test + public void testFromCompletedPartCRC32C() { + final CompletedPart completedPart = CompletedPart.builder() + .eTag("tag") + .checksumCRC32C("checksum") + .build(); + final UploadEtag uploadEtag = UploadEtag.fromCompletedPart(completedPart); + Assertions.assertThat(uploadEtag.getEtag()) + .describedAs("Etag mismatch") + .isEqualTo("tag"); + Assertions.assertThat(uploadEtag.getChecksumAlgorithm()) + .describedAs("Checksum algorithm should be CRC32C") + .isEqualTo("CRC32C"); + Assertions.assertThat(uploadEtag.getChecksum()) + .describedAs("Checksum mismatch") + .isEqualTo("checksum"); + } + + @Test + public void testFromCompletedPartSHA1() { + final CompletedPart completedPart = CompletedPart.builder() + .eTag("tag") + .checksumSHA1("checksum") + .build(); + final UploadEtag uploadEtag = UploadEtag.fromCompletedPart(completedPart); + Assertions.assertThat(uploadEtag.getEtag()) + .describedAs("Etag mismatch") + .isEqualTo("tag"); + Assertions.assertThat(uploadEtag.getChecksumAlgorithm()) + .describedAs("Checksum algorithm should be SHA1") + .isEqualTo("SHA1"); + Assertions.assertThat(uploadEtag.getChecksum()) + .describedAs("Checksum mismatch") + .isEqualTo("checksum"); + } + + @Test + public void testFromCompletedPartSHA256() { + final CompletedPart completedPart = CompletedPart.builder() + .eTag("tag") + .checksumSHA256("checksum") + .build(); + final UploadEtag uploadEtag = UploadEtag.fromCompletedPart(completedPart); + Assertions.assertThat(uploadEtag.getEtag()) + .describedAs("Etag mismatch") + .isEqualTo("tag"); + Assertions.assertThat(uploadEtag.getChecksumAlgorithm()) + .describedAs("Checksum algorithm should be SHA256") + .isEqualTo("SHA256"); + Assertions.assertThat(uploadEtag.getChecksum()) + .describedAs("Checksum mismatch") + .isEqualTo("checksum"); + } + + @Test + public void testFromCompletedPartNoChecksum() { + final CompletedPart completedPart = CompletedPart.builder() + .eTag("tag") + .build(); + final UploadEtag uploadEtag = UploadEtag.fromCompletedPart(completedPart); + Assertions.assertThat(uploadEtag.getEtag()) + .describedAs("Etag mismatch") + .isEqualTo("tag"); + Assertions.assertThat(uploadEtag.getChecksumAlgorithm()) + .describedAs("uploadEtag.getChecksumAlgorithm()") + .isNull(); + Assertions.assertThat(uploadEtag.getChecksum()) + .describedAs("uploadEtag.getChecksum()") + .isNull(); + } + + @Test + public void testToCompletedPartCRC32() { + final UploadEtag uploadEtag = new UploadEtag("tag", "CRC32", "checksum"); + final CompletedPart completedPart = UploadEtag.toCompletedPart(uploadEtag, 1); + Assertions.assertThat(completedPart.checksumCRC32()) + .describedAs("Checksum mismatch") + .isEqualTo("checksum"); + } + + @Test + public void testToCompletedPartCRC32C() { + final UploadEtag uploadEtag = new UploadEtag("tag", "CRC32C", "checksum"); + final CompletedPart completedPart = UploadEtag.toCompletedPart(uploadEtag, 1); + Assertions.assertThat(completedPart.checksumCRC32C()) + .describedAs("Checksum mismatch") + .isEqualTo("checksum"); + } + + @Test + public void testToCompletedPartSHA1() { + final UploadEtag uploadEtag = new UploadEtag("tag", "SHA1", "checksum"); + final CompletedPart completedPart = UploadEtag.toCompletedPart(uploadEtag, 1); + Assertions.assertThat(completedPart.checksumSHA1()) + .describedAs("Checksum mismatch") + .isEqualTo("checksum"); + } + + @Test + public void testToCompletedPartSHA256() { + final UploadEtag uploadEtag = new UploadEtag("tag", "SHA256", "checksum"); + final CompletedPart completedPart = UploadEtag.toCompletedPart(uploadEtag, 1); + Assertions.assertThat(completedPart.checksumSHA256()) + .describedAs("Checksum mismatch") + .isEqualTo("checksum"); + } + + @Test + public void testToCompletedPartNoChecksum() { + final UploadEtag uploadEtag = new UploadEtag("tag", null, null); + final CompletedPart completedPart = UploadEtag.toCompletedPart(uploadEtag, 1); + Assertions.assertThat(completedPart.checksumCRC32()) + .describedAs("completedPart.checksumCRC32()") + .isNull(); + Assertions.assertThat(completedPart.checksumCRC32C()) + .describedAs("completedPart.checksumCRC32C()") + .isNull(); + Assertions.assertThat(completedPart.checksumSHA1()) + .describedAs("completedPart.checksumSHA1()") + .isNull(); + Assertions.assertThat(completedPart.checksumSHA256()) + .describedAs("completedPart.checksumSHA256()") + .isNull(); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java index 28bd8b878e5..63b79c230fd 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java @@ -780,7 +780,7 @@ private static void assertValidUpload(Map<String, List<String>> parts, for (int i = 0; i < tags.size(); i += 1) { assertEquals("Should commit the correct part tags", - tags.get(i), commit.getEtags().get(i)); + tags.get(i), commit.getEtags().get(i).getEtag()); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java index 28161979f0b..1113b54b247 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.s3a.MockS3AFileSystem; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.commit.PathCommitException; +import org.apache.hadoop.fs.s3a.commit.files.UploadEtag; import org.apache.hadoop.fs.s3a.commit.files.PendingSet; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.fs.s3a.commit.impl.CommitContext; @@ -100,9 +101,9 @@ protected ActiveCommit listPendingUploadsToCommit( commit.setDestinationKey(key); commit.setUri("s3a://" + BUCKET + "/" + key); commit.setUploadId(uploadId); - ArrayList<String> etags = new ArrayList<>(); - etags.add("tag1"); - commit.setEtags(etags); + ArrayList<UploadEtag> uploadEtags = new ArrayList<>(); + uploadEtags.add(new UploadEtag("tag1", null, null)); + commit.setEtags(uploadEtags); pendingSet.add(commit); // register the upload so commit operations are not rejected getMockResults().addUpload(uploadId, key); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestChecksumSupport.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestChecksumSupport.java new file mode 100644 index 00000000000..5e8ee5d8c76 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestChecksumSupport.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm; + +import org.apache.hadoop.conf.Configuration; + +import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_ALGORITHM; + +public class TestChecksumSupport { + + @Test + public void testGetSupportedChecksumAlgorithmCRC32() { + testGetSupportedChecksumAlgorithm(ChecksumAlgorithm.CRC32); + } + + @Test + public void testGetSupportedChecksumAlgorithmCRC32C() { + testGetSupportedChecksumAlgorithm(ChecksumAlgorithm.CRC32_C); + } + + @Test + public void testGetSupportedChecksumAlgorithmSHA1() { + testGetSupportedChecksumAlgorithm(ChecksumAlgorithm.SHA1); + } + + @Test + public void testGetSupportedChecksumAlgorithmSHA256() { + testGetSupportedChecksumAlgorithm(ChecksumAlgorithm.SHA256); + } + + private void testGetSupportedChecksumAlgorithm(ChecksumAlgorithm checksumAlgorithm) { + final Configuration conf = new Configuration(); + conf.set(CHECKSUM_ALGORITHM, checksumAlgorithm.toString()); + Assertions.assertThat(ChecksumSupport.getChecksumAlgorithm(conf)) + .describedAs("Checksum algorithm must match value set in the configuration") + .isEqualTo(checksumAlgorithm); + } + + @Test + public void testGetChecksumAlgorithmWhenNull() { + final Configuration conf = new Configuration(); + conf.unset(CHECKSUM_ALGORITHM); + Assertions.assertThat(ChecksumSupport.getChecksumAlgorithm(conf)) + .describedAs("If configuration is not set, checksum algorithm must be null") + .isNull(); + } + + @Test + public void testGetNotSupportedChecksumAlgorithm() { + final Configuration conf = new Configuration(); + conf.set(CHECKSUM_ALGORITHM, "INVALID"); + Assertions.assertThatThrownBy(() -> ChecksumSupport.getChecksumAlgorithm(conf)) + .describedAs("Invalid checksum algorithm should throw an exception") + .isInstanceOf(IllegalArgumentException.class); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java index c779062f518..2285ed06c2b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java @@ -19,22 +19,29 @@ package org.apache.hadoop.fs.s3a.impl; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; +import java.util.Base64; import java.util.concurrent.atomic.AtomicLong; +import org.junit.Test; import software.amazon.awssdk.awscore.AwsRequest; import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; import software.amazon.awssdk.core.SdkRequest; +import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CopyObjectRequest; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; import org.assertj.core.api.Assertions; -import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.S3Request; +import software.amazon.awssdk.services.s3.model.ServerSideEncryption; import software.amazon.awssdk.services.s3.model.UploadPartRequest; - +import software.amazon.awssdk.utils.Md5Utils; import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.s3a.S3AEncryptionMethods; @@ -275,4 +282,77 @@ public void testUploadTimeouts() throws Throwable { assertApiTimeouts(partDuration, upload); } + + @Test + public void testRequestFactoryWithChecksumAlgorithmCRC32() throws IOException { + testRequestFactoryWithChecksumAlgorithm(ChecksumAlgorithm.CRC32); + } + + @Test + public void testRequestFactoryWithChecksumAlgorithmCRC32C() throws IOException { + testRequestFactoryWithChecksumAlgorithm(ChecksumAlgorithm.CRC32_C); + } + + @Test + public void testRequestFactoryWithChecksumAlgorithmSHA1() throws IOException { + testRequestFactoryWithChecksumAlgorithm(ChecksumAlgorithm.SHA1); + } + + @Test + public void testRequestFactoryWithChecksumAlgorithmSHA256() throws IOException { + testRequestFactoryWithChecksumAlgorithm(ChecksumAlgorithm.SHA256); + } + + private void testRequestFactoryWithChecksumAlgorithm(ChecksumAlgorithm checksumAlgorithm) + throws IOException { + String path = "path"; + String path2 = "path2"; + HeadObjectResponse md = HeadObjectResponse.builder().contentLength(128L).build(); + + RequestFactory factory = RequestFactoryImpl.builder() + .withBucket("bucket") + .withChecksumAlgorithm(checksumAlgorithm) + .build(); + createFactoryObjects(factory); + + final CopyObjectRequest copyObjectRequest = factory.newCopyObjectRequestBuilder(path, + path2, md).build(); + Assertions.assertThat(copyObjectRequest.checksumAlgorithm()).isEqualTo(checksumAlgorithm); + + final PutObjectRequest putObjectRequest = factory.newPutObjectRequestBuilder(path, + PutObjectOptions.keepingDirs(), 1024, false).build(); + Assertions.assertThat(putObjectRequest.checksumAlgorithm()).isEqualTo(checksumAlgorithm); + + final CreateMultipartUploadRequest multipartUploadRequest = + factory.newMultipartUploadRequestBuilder(path, null).build(); + Assertions.assertThat(multipartUploadRequest.checksumAlgorithm()).isEqualTo(checksumAlgorithm); + + final UploadPartRequest uploadPartRequest = factory.newUploadPartRequestBuilder(path, + "id", 2, true, 128_000_000).build(); + Assertions.assertThat(uploadPartRequest.checksumAlgorithm()).isEqualTo(checksumAlgorithm); + } + + @Test + public void testCompleteMultipartUploadRequestWithChecksumAlgorithmAndSSEC() throws IOException { + final byte[] encryptionKey = "encryptionKey".getBytes(StandardCharsets.UTF_8); + final String encryptionKeyBase64 = Base64.getEncoder() + .encodeToString(encryptionKey); + final String encryptionKeyMd5 = Md5Utils.md5AsBase64(encryptionKey); + final EncryptionSecrets encryptionSecrets = new EncryptionSecrets(S3AEncryptionMethods.SSE_C, + encryptionKeyBase64, null); + RequestFactory factory = RequestFactoryImpl.builder() + .withBucket("bucket") + .withChecksumAlgorithm(ChecksumAlgorithm.CRC32_C) + .withEncryptionSecrets(encryptionSecrets) + .build(); + createFactoryObjects(factory); + + final CompleteMultipartUploadRequest request = + factory.newCompleteMultipartUploadRequestBuilder("path", "1", new ArrayList<>()) + .build(); + Assertions.assertThat(request.sseCustomerAlgorithm()) + .isEqualTo(ServerSideEncryption.AES256.name()); + Assertions.assertThat(request.sseCustomerKey()).isEqualTo(encryptionKeyBase64); + Assertions.assertThat(request.sseCustomerKeyMD5()).isEqualTo(encryptionKeyMd5); + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestS3AMultipartUploaderSupport.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestS3AMultipartUploaderSupport.java index 71305aa6633..afdad31440a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestS3AMultipartUploaderSupport.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestS3AMultipartUploaderSupport.java @@ -20,14 +20,18 @@ import java.io.EOFException; import java.io.IOException; +import java.util.Map; +import org.assertj.core.api.Assertions; import org.junit.Test; +import software.amazon.awssdk.services.s3.model.UploadPartResponse; import org.apache.hadoop.test.HadoopTestBase; import static org.apache.hadoop.fs.s3a.impl.S3AMultipartUploader.PartHandlePayload; import static org.apache.hadoop.fs.s3a.impl.S3AMultipartUploader.buildPartHandlePayload; import static org.apache.hadoop.fs.s3a.impl.S3AMultipartUploader.parsePartHandlePayload; +import static org.apache.hadoop.fs.s3a.impl.S3AMultipartUploader.extractChecksum; import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** @@ -41,35 +45,60 @@ public class TestS3AMultipartUploaderSupport extends HadoopTestBase { @Test public void testRoundTrip() throws Throwable { - PartHandlePayload result = roundTrip(999, "tag", 1); + PartHandlePayload result = roundTrip(999, "tag", 1, null, null); assertEquals(PATH, result.getPath()); assertEquals(UPLOAD, result.getUploadId()); assertEquals(999, result.getPartNumber()); assertEquals("tag", result.getEtag()); assertEquals(1, result.getLen()); + Assertions.assertThat(result.getChecksumAlgorithm()) + .describedAs("Checksum algorithm must not be present").isNull(); + Assertions.assertThat(result.getChecksum()) + .describedAs("Checksum must not be generated").isNull(); } @Test public void testRoundTrip2() throws Throwable { long len = 1L + Integer.MAX_VALUE; PartHandlePayload result = - roundTrip(1, "11223344", len); + roundTrip(1, "11223344", len, null, null); assertEquals(1, result.getPartNumber()); assertEquals("11223344", result.getEtag()); assertEquals(len, result.getLen()); + Assertions.assertThat(result.getChecksumAlgorithm()) + .describedAs("Checksum algorithm must not be present").isNull(); + Assertions.assertThat(result.getChecksum()) + .describedAs("Checksum must not be generated").isNull(); + } + + @Test + public void testRoundTripWithChecksum() throws Throwable { + PartHandlePayload result = roundTrip(999, "tag", 1, + "SHA256", "checksum"); + assertEquals(PATH, result.getPath()); + assertEquals(UPLOAD, result.getUploadId()); + assertEquals(999, result.getPartNumber()); + assertEquals("tag", result.getEtag()); + assertEquals(1, result.getLen()); + Assertions.assertThat(result.getChecksumAlgorithm()) + .describedAs("Expect the checksum algorithm to be SHA256") + .isEqualTo("SHA256"); + Assertions.assertThat(result.getChecksum()) + .describedAs("Checksum must be set") + .isEqualTo("checksum"); } @Test public void testNoEtag() throws Throwable { intercept(IllegalArgumentException.class, () -> buildPartHandlePayload(PATH, UPLOAD, - 0, "", 1)); + 0, "", 1, null, null)); } @Test public void testNoLen() throws Throwable { intercept(IllegalArgumentException.class, - () -> buildPartHandlePayload(PATH, UPLOAD, 0, "tag", -1)); + () -> buildPartHandlePayload(PATH, UPLOAD, 0, "tag", -1, null, null)); } @Test @@ -80,17 +109,97 @@ public void testBadPayload() throws Throwable { @Test public void testBadHeader() throws Throwable { - byte[] bytes = buildPartHandlePayload(PATH, UPLOAD, 0, "tag", 1); + byte[] bytes = buildPartHandlePayload(PATH, UPLOAD, 0, "tag", 1, null, null); bytes[2] = 'f'; intercept(IOException.class, "header", () -> parsePartHandlePayload(bytes)); } + @Test + public void testNoChecksumAlgorithm() throws Exception { + intercept(IllegalArgumentException.class, + () -> buildPartHandlePayload(PATH, UPLOAD, + 999, "tag", 1, "", "checksum")); + } + + @Test + public void testNoChecksum() throws Exception { + intercept(IllegalArgumentException.class, + () -> buildPartHandlePayload(PATH, UPLOAD, + 999, "tag", 1, "SHA256", "")); + } + + @Test + public void testExtractChecksumCRC32() { + final UploadPartResponse uploadPartResponse = UploadPartResponse.builder() + .checksumCRC32("checksum") + .build(); + final Map.Entry<String, String> checksum = extractChecksum(uploadPartResponse); + Assertions.assertThat(checksum.getKey()) + .describedAs("Expect the checksum algorithm to be CRC32") + .isEqualTo("CRC32"); + Assertions.assertThat(checksum.getValue()) + .describedAs("Checksum must be set") + .isEqualTo("checksum"); + } + + @Test + public void testExtractChecksumCRC32C() { + final UploadPartResponse uploadPartResponse = UploadPartResponse.builder() + .checksumCRC32C("checksum") + .build(); + final Map.Entry<String, String> checksum = extractChecksum(uploadPartResponse); + Assertions.assertThat(checksum.getKey()) + .describedAs("Expect the checksum algorithm to be CRC32C") + .isEqualTo("CRC32C"); + Assertions.assertThat(checksum.getValue()) + .describedAs("Checksum must be set") + .isEqualTo("checksum"); + } + + @Test + public void testExtractChecksumSHA1() { + final UploadPartResponse uploadPartResponse = UploadPartResponse.builder() + .checksumSHA1("checksum") + .build(); + final Map.Entry<String, String> checksum = extractChecksum(uploadPartResponse); + Assertions.assertThat(checksum.getKey()) + .describedAs("Expect the checksum algorithm to be SHA1") + .isEqualTo("SHA1"); + Assertions.assertThat(checksum.getValue()) + .describedAs("Checksum must be set") + .isEqualTo("checksum"); + } + + @Test + public void testExtractChecksumSHA256() { + final UploadPartResponse uploadPartResponse = UploadPartResponse.builder() + .checksumSHA256("checksum") + .build(); + final Map.Entry<String, String> checksum = extractChecksum(uploadPartResponse); + Assertions.assertThat(checksum.getKey()) + .describedAs("Expect the checksum algorithm to be SHA256") + .isEqualTo("SHA256"); + Assertions.assertThat(checksum.getValue()) + .describedAs("Checksum must be set") + .isEqualTo("checksum"); + } + + @Test + public void testExtractChecksumEmpty() { + final UploadPartResponse uploadPartResponse = UploadPartResponse.builder().build(); + final Map.Entry<String, String> checksum = extractChecksum(uploadPartResponse); + assertNull(checksum); + } + private PartHandlePayload roundTrip( int partNumber, String tag, - long len) throws IOException { - byte[] bytes = buildPartHandlePayload(PATH, UPLOAD, partNumber, tag, len); + long len, + String checksumAlgorithm, + String checksum) throws IOException { + byte[] bytes = buildPartHandlePayload(PATH, UPLOAD, partNumber, tag, len, + checksumAlgorithm, checksum); return parsePartHandlePayload(bytes); } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org