This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new d2e861dcb5a HADOOP-15224. S3A: Add option to set checksum on S3 
objects (#7396) (#7550)
d2e861dcb5a is described below

commit d2e861dcb5aebd8da12853e2e53d0ee50508fe01
Author: Raphael Azzolini <azzol...@amazon.com>
AuthorDate: Fri Apr 4 09:17:46 2025 -0700

    HADOOP-15224. S3A: Add option to set checksum on S3 objects (#7396) (#7550)
    
    
    Add the property fs.s3a.checksum.algorithm that allow users to specify a 
checksum algorithm (CRC32, CRC32C, SHA1, or SHA256) to be used by the AWS SDK 
to generate the checksum for object integrity check.
    
    Contributed by Raphael Azzolini
---
 .../java/org/apache/hadoop/fs/s3a/Constants.java   |   9 ++
 .../apache/hadoop/fs/s3a/S3ABlockOutputStream.java |   4 +
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |   2 +
 .../fs/s3a/commit/files/SinglePendingCommit.java   |  19 +--
 .../hadoop/fs/s3a/commit/files/UploadEtag.java     | 136 +++++++++++++++++
 .../fs/s3a/commit/impl/CommitOperations.java       |   9 +-
 .../apache/hadoop/fs/s3a/impl/ChecksumSupport.java |  71 +++++++++
 .../hadoop/fs/s3a/impl/RequestFactoryImpl.java     |  50 ++++++
 .../hadoop/fs/s3a/impl/S3AMultipartUploader.java   |  77 +++++++++-
 .../src/site/markdown/tools/hadoop-aws/index.md    |   9 ++
 .../tools/hadoop-aws/troubleshooting_s3a.md        |  22 +++
 .../org/apache/hadoop/fs/s3a/ITestS3AChecksum.java | 120 +++++++++++++++
 .../apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java  |  18 ++-
 .../hadoop/fs/s3a/auth/ITestCustomSigner.java      |   5 +
 .../fs/s3a/commit/ITestCommitOperations.java       |   7 +-
 .../hadoop/fs/s3a/commit/files/TestUploadEtag.java | 169 +++++++++++++++++++++
 .../s3a/commit/staging/TestStagingCommitter.java   |   2 +-
 .../staging/TestStagingPartitionedJobCommit.java   |   7 +-
 .../hadoop/fs/s3a/impl/TestChecksumSupport.java    |  76 +++++++++
 .../hadoop/fs/s3a/impl/TestRequestFactory.java     |  84 +++++++++-
 .../s3a/impl/TestS3AMultipartUploaderSupport.java  | 123 ++++++++++++++-
 21 files changed, 983 insertions(+), 36 deletions(-)

diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index d0cfd777813..2c479d1ddc2 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -1764,6 +1764,15 @@ private Constants() {
    */
   public static final boolean CHECKSUM_VALIDATION_DEFAULT = false;
 
+  /**
+   * Indicates the algorithm used to create the checksum for the object
+   * to be uploaded to S3. Unset by default. It supports the following values:
+   * 'CRC32', 'CRC32C', 'SHA1', and 'SHA256'
+   * value:{@value}
+   */
+  public static final String CHECKSUM_ALGORITHM =
+      "fs.s3a.create.checksum.algorithm";
+
   /**
    * Are extensions classes, such as {@code fs.s3a.aws.credentials.provider},
    * going to be loaded from the same classloader that loaded
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
index 7b249a11c07..db1134dc5a2 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
@@ -1064,6 +1064,10 @@ private void uploadBlockAsync(final 
S3ADataBlocks.DataBlock block,
               return CompletedPart.builder()
                   .eTag(response.eTag())
                   .partNumber(currentPartNumber)
+                  .checksumCRC32(response.checksumCRC32())
+                  .checksumCRC32C(response.checksumCRC32C())
+                  .checksumSHA1(response.checksumSHA1())
+                  .checksumSHA256(response.checksumSHA256())
                   .build();
             } catch (Exception e) {
               final IOException ex = e instanceof IOException
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 46a0d9da56e..4181fd65c97 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -118,6 +118,7 @@
 import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperationCallbacksImpl;
 import org.apache.hadoop.fs.s3a.impl.CSES3AFileSystemOperations;
 import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
+import org.apache.hadoop.fs.s3a.impl.ChecksumSupport;
 import org.apache.hadoop.fs.s3a.impl.ClientManager;
 import org.apache.hadoop.fs.s3a.impl.ClientManagerImpl;
 import org.apache.hadoop.fs.s3a.impl.ConfigurationHelper;
@@ -1336,6 +1337,7 @@ protected RequestFactory createRequestFactory() {
         .withStorageClass(storageClass)
         .withMultipartUploadEnabled(isMultipartUploadEnabled)
         .withPartUploadTimeout(partUploadTimeout)
+        .withChecksumAlgorithm(ChecksumSupport.getChecksumAlgorithm(getConf()))
         .build();
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java
index e4541ba4da3..aaa91bc757c 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java
@@ -71,7 +71,7 @@
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class SinglePendingCommit extends 
PersistentCommitData<SinglePendingCommit>
-    implements Iterable<String> {
+    implements Iterable<UploadEtag> {
 
   /**
    * Serialization ID: {@value}.
@@ -118,7 +118,7 @@ public class SinglePendingCommit extends 
PersistentCommitData<SinglePendingCommi
   private String text = "";
 
   /** Ordered list of etags. */
-  private List<String> etags;
+  private List<UploadEtag> etags;
 
   /**
    * Any custom extra data committer subclasses may choose to add.
@@ -222,7 +222,7 @@ public void bindCommitData(List<CompletedPart> parts) 
throws ValidationFailure {
     for (CompletedPart part : parts) {
       verify(part.partNumber() == counter,
           "Expected part number %s but got %s", counter, part.partNumber());
-      etags.add(part.eTag());
+      etags.add(UploadEtag.fromCompletedPart(part));
       counter++;
     }
   }
@@ -237,9 +237,10 @@ public void validate() throws ValidationFailure {
     verify(length >= 0, "Invalid length: " + length);
     destinationPath();
     verify(etags != null, "No etag list");
-    validateCollectionClass(etags, String.class);
-    for (String etag : etags) {
-      verify(StringUtils.isNotEmpty(etag), "Empty etag");
+    validateCollectionClass(etags, UploadEtag.class);
+    for (UploadEtag etag : etags) {
+      verify(etag != null && StringUtils.isNotEmpty(etag.getEtag()),
+          "Empty etag");
     }
     if (extraData != null) {
       validateCollectionClass(extraData.keySet(), String.class);
@@ -313,7 +314,7 @@ public int getPartCount() {
    * @return an iterator.
    */
   @Override
-  public Iterator<String> iterator() {
+  public Iterator<UploadEtag> iterator() {
     return etags.iterator();
   }
 
@@ -442,11 +443,11 @@ public void setText(String text) {
   }
 
   /** @return ordered list of etags. */
-  public List<String> getEtags() {
+  public List<UploadEtag> getEtags() {
     return etags;
   }
 
-  public void setEtags(List<String> etags) {
+  public void setEtags(List<UploadEtag> etags) {
     this.etags = etags;
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/UploadEtag.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/UploadEtag.java
new file mode 100644
index 00000000000..79a8956810f
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/UploadEtag.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.files;
+
+import java.io.Serializable;
+import java.util.StringJoiner;
+
+import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
+
+/**
+ * Stores ETag and checksum values from {@link  CompletedPart} responses from 
S3.
+ * These values need to be stored to be later passed to the
+ * {@link 
software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest
+ * CompleteMultipartUploadRequest}
+ */
+public class UploadEtag implements Serializable {
+
+  /**
+   * Serialization ID: {@value}.
+   */
+  private static final long serialVersionUID = 1L;
+
+  private String etag;
+  private String checksumAlgorithm;
+  private String checksum;
+
+  public UploadEtag() {
+  }
+
+  public UploadEtag(String etag, String checksumAlgorithm, String checksum) {
+    this.etag = etag;
+    this.checksumAlgorithm = checksumAlgorithm;
+    this.checksum = checksum;
+  }
+
+  public String getEtag() {
+    return etag;
+  }
+
+  public void setEtag(String etag) {
+    this.etag = etag;
+  }
+
+  public String getChecksumAlgorithm() {
+    return checksumAlgorithm;
+  }
+
+  public void setChecksumAlgorithm(String checksumAlgorithm) {
+    this.checksumAlgorithm = checksumAlgorithm;
+  }
+
+  public String getChecksum() {
+    return checksum;
+  }
+
+  public void setChecksum(String checksum) {
+    this.checksum = checksum;
+  }
+
+  public static UploadEtag fromCompletedPart(CompletedPart completedPart) {
+    UploadEtag uploadEtag = new UploadEtag();
+    uploadEtag.setEtag(completedPart.eTag());
+    if (completedPart.checksumCRC32() != null) {
+      uploadEtag.setChecksumAlgorithm(ChecksumAlgorithm.CRC32.toString());
+      uploadEtag.setChecksum(completedPart.checksumCRC32());
+    }
+    if (completedPart.checksumCRC32C() != null) {
+      uploadEtag.setChecksumAlgorithm(ChecksumAlgorithm.CRC32_C.toString());
+      uploadEtag.setChecksum(completedPart.checksumCRC32C());
+    }
+    if (completedPart.checksumSHA1() != null) {
+      uploadEtag.setChecksumAlgorithm(ChecksumAlgorithm.SHA1.toString());
+      uploadEtag.setChecksum(completedPart.checksumSHA1());
+    }
+    if (completedPart.checksumSHA256() != null) {
+      uploadEtag.setChecksumAlgorithm(ChecksumAlgorithm.SHA256.toString());
+      uploadEtag.setChecksum(completedPart.checksumSHA256());
+    }
+    return uploadEtag;
+  }
+
+  public static CompletedPart toCompletedPart(UploadEtag uploadEtag, int 
partNumber) {
+    final CompletedPart.Builder builder = CompletedPart.builder()
+        .partNumber(partNumber)
+        .eTag(uploadEtag.etag);
+    if (uploadEtag.checksumAlgorithm == null) {
+      return builder.build();
+    }
+    final ChecksumAlgorithm checksumAlgorithm = ChecksumAlgorithm.fromValue(
+        uploadEtag.checksumAlgorithm);
+    switch (checksumAlgorithm) {
+    case CRC32:
+      builder.checksumCRC32(uploadEtag.checksum);
+      break;
+    case CRC32_C:
+      builder.checksumCRC32C(uploadEtag.checksum);
+      break;
+    case SHA1:
+      builder.checksumSHA1(uploadEtag.checksum);
+      break;
+    case SHA256:
+      builder.checksumSHA256(uploadEtag.checksum);
+      break;
+    default:
+      // do nothing
+    }
+    return builder.build();
+  }
+
+  @Override
+  public String toString() {
+    return new StringJoiner(", ", UploadEtag.class.getSimpleName() + "[", "]")
+        .add("serialVersionUID='" + serialVersionUID + "'")
+        .add("etag='" + etag + "'")
+        .add("checksumAlgorithm='" + checksumAlgorithm + "'")
+        .add("checksum='" + checksum + "'")
+        .toString();
+  }
+}
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java
index 14bd4cc2f7d..c73b07ccf75 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java
@@ -53,6 +53,7 @@
 import org.apache.hadoop.fs.s3a.WriteOperations;
 import org.apache.hadoop.fs.s3a.commit.CommitConstants;
 import org.apache.hadoop.fs.s3a.commit.PathCommitException;
+import org.apache.hadoop.fs.s3a.commit.files.UploadEtag;
 import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
 import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
 import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
@@ -165,9 +166,9 @@ public CommitOperations(S3AFileSystem fs,
    * @param tagIds list of tags
    * @return same list, now in numbered tuples
    */
-  public static List<CompletedPart> toPartEtags(List<String> tagIds) {
+  public static List<CompletedPart> toPartEtags(List<UploadEtag> tagIds) {
     return IntStream.range(0, tagIds.size())
-        .mapToObj(i -> CompletedPart.builder().partNumber(i + 
1).eTag(tagIds.get(i)).build())
+        .mapToObj(i -> UploadEtag.toCompletedPart(tagIds.get(i), i + 1))
         .collect(Collectors.toList());
   }
 
@@ -655,6 +656,10 @@ private List<CompletedPart> uploadFileData(
       parts.add(CompletedPart.builder()
           .partNumber(partNumber)
           .eTag(response.eTag())
+          .checksumCRC32(response.checksumCRC32())
+          .checksumCRC32C(response.checksumCRC32C())
+          .checksumSHA1(response.checksumSHA1())
+          .checksumSHA256(response.checksumSHA256())
           .build());
     }
     return parts;
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChecksumSupport.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChecksumSupport.java
new file mode 100644
index 00000000000..b14f5f7bd23
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChecksumSupport.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.util.Set;
+
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
+import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ConfigurationHelper;
+
+import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_ALGORITHM;
+
+/**
+ * Utility class to support operations on S3 object checksum.
+ */
+public final class ChecksumSupport {
+
+  private ChecksumSupport() {
+  }
+
+  /**
+   * Checksum algorithms that are supported by S3A.
+   */
+  private static final Set<ChecksumAlgorithm> SUPPORTED_CHECKSUM_ALGORITHMS = 
ImmutableSet.of(
+      ChecksumAlgorithm.CRC32,
+      ChecksumAlgorithm.CRC32_C,
+      ChecksumAlgorithm.SHA1,
+      ChecksumAlgorithm.SHA256);
+
+  /**
+   * Get the checksum algorithm to be used for data integrity check of the 
objects in S3.
+   * This operation includes validating if the provided value is a supported 
checksum algorithm.
+   * @param conf configuration to scan
+   * @return the checksum algorithm to be passed on S3 requests
+   * @throws IllegalArgumentException if the checksum algorithm is not known 
or not supported
+   */
+  public static ChecksumAlgorithm getChecksumAlgorithm(Configuration conf) {
+    return ConfigurationHelper.resolveEnum(conf,
+        CHECKSUM_ALGORITHM,
+        ChecksumAlgorithm.class,
+        configValue -> {
+          if (StringUtils.isBlank(configValue)) {
+            return null;
+          }
+          if 
(ChecksumAlgorithm.CRC32_C.toString().equalsIgnoreCase(configValue)) {
+            // In case the configuration value is CRC32C, without underscore.
+            return ChecksumAlgorithm.CRC32_C;
+          }
+          throw new IllegalArgumentException("Checksum algorithm is not 
supported: " + configValue);
+        });
+  }
+}
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
index 00d9368aa58..f03b83764b8 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
@@ -27,6 +27,7 @@
 
 import software.amazon.awssdk.core.SdkRequest;
 import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
 import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
 import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
 import software.amazon.awssdk.services.s3.model.CompletedPart;
@@ -62,6 +63,7 @@
 
 import static org.apache.commons.lang3.StringUtils.isNotEmpty;
 import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
+import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_C;
 import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.setRequestTimeout;
 import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
@@ -138,6 +140,11 @@ public class RequestFactoryImpl implements RequestFactory {
    */
   private final Duration partUploadTimeout;
 
+  /**
+   * Indicates the algorithm used to create the checksum for the object to be 
uploaded to S3.
+   */
+  private final ChecksumAlgorithm checksumAlgorithm;
+
   /**
    * Constructor.
    * @param builder builder with all the configuration.
@@ -153,6 +160,7 @@ protected RequestFactoryImpl(
     this.storageClass = builder.storageClass;
     this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled;
     this.partUploadTimeout = builder.partUploadTimeout;
+    this.checksumAlgorithm = builder.checksumAlgorithm;
   }
 
   /**
@@ -235,6 +243,10 @@ private CopyObjectRequest.Builder buildCopyObjectRequest() 
{
       copyObjectRequestBuilder.contentEncoding(contentEncoding);
     }
 
+    if (checksumAlgorithm != null) {
+      copyObjectRequestBuilder.checksumAlgorithm(checksumAlgorithm);
+    }
+
     return copyObjectRequestBuilder;
   }
 
@@ -377,6 +389,10 @@ private PutObjectRequest.Builder 
buildPutObjectRequest(long length, boolean isDi
       putObjectRequestBuilder.contentEncoding(contentEncoding);
     }
 
+    if (checksumAlgorithm != null) {
+      putObjectRequestBuilder.checksumAlgorithm(checksumAlgorithm);
+    }
+
     return putObjectRequestBuilder;
   }
 
@@ -526,6 +542,10 @@ public CreateMultipartUploadRequest.Builder 
newMultipartUploadRequestBuilder(
       requestBuilder.storageClass(storageClass);
     }
 
+    if (checksumAlgorithm != null) {
+      requestBuilder.checksumAlgorithm(checksumAlgorithm);
+    }
+
     return prepareRequest(requestBuilder);
   }
 
@@ -539,6 +559,16 @@ public CompleteMultipartUploadRequest.Builder 
newCompleteMultipartUploadRequestB
     CompleteMultipartUploadRequest.Builder requestBuilder =
         
CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId)
             
.multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build());
+    // Correct SSE-C request parameters are required for this request when
+    // specifying checksums for each part
+    if (checksumAlgorithm != null && getServerSideEncryptionAlgorithm() == 
SSE_C) {
+      EncryptionSecretOperations.getSSECustomerKey(encryptionSecrets)
+          .ifPresent(base64customerKey -> requestBuilder
+              .sseCustomerAlgorithm(ServerSideEncryption.AES256.name())
+              .sseCustomerKey(base64customerKey)
+              .sseCustomerKeyMD5(
+                  
Md5Utils.md5AsBase64(Base64.getDecoder().decode(base64customerKey))));
+    }
 
     return prepareRequest(requestBuilder);
   }
@@ -618,6 +648,11 @@ public UploadPartRequest.Builder 
newUploadPartRequestBuilder(
 
     // Set the request timeout for the part upload
     setRequestTimeout(builder, partUploadTimeout);
+
+    if (checksumAlgorithm != null) {
+      builder.checksumAlgorithm(checksumAlgorithm);
+    }
+
     return prepareRequest(builder);
   }
 
@@ -732,6 +767,11 @@ public static final class RequestFactoryBuilder {
      */
     private Duration partUploadTimeout = DEFAULT_PART_UPLOAD_TIMEOUT;
 
+    /**
+     * Indicates the algorithm used to create the checksum for the object to 
be uploaded to S3.
+     */
+    private ChecksumAlgorithm checksumAlgorithm;
+
     private RequestFactoryBuilder() {
     }
 
@@ -841,6 +881,16 @@ public RequestFactoryBuilder withPartUploadTimeout(final 
Duration value) {
       partUploadTimeout = value;
       return this;
     }
+
+    /**
+     * Indicates the algorithm used to create the checksum for the object to 
be uploaded to S3.
+     * @param value new value
+     * @return the builder
+     */
+    public RequestFactoryBuilder withChecksumAlgorithm(final ChecksumAlgorithm 
value) {
+      checksumAlgorithm = value;
+      return this;
+    }
   }
 
   /**
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java
index e00319c3dc5..17c331d1ccf 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java
@@ -26,6 +26,7 @@
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashSet;
@@ -54,6 +55,7 @@
 import org.apache.hadoop.fs.UploadHandle;
 import org.apache.hadoop.fs.impl.AbstractMultipartUploader;
 import org.apache.hadoop.fs.s3a.WriteOperations;
+import org.apache.hadoop.fs.s3a.commit.files.UploadEtag;
 import org.apache.hadoop.fs.s3a.statistics.S3AMultipartUploaderStatistics;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.util.Preconditions;
@@ -160,6 +162,13 @@ public CompletableFuture<PartHandle> putPart(
           UploadPartResponse response = writeOperations.uploadPart(request, 
body, statistics);
           statistics.partPut(lengthInBytes);
           String eTag = response.eTag();
+          String checksumAlgorithm = null;
+          String checksum = null;
+          final Map.Entry<String, String> extractedChecksum = 
extractChecksum(response);
+          if (extractedChecksum != null) {
+            checksumAlgorithm = extractedChecksum.getKey();
+            checksum = extractedChecksum.getValue();
+          }
           return BBPartHandle.from(
               ByteBuffer.wrap(
                   buildPartHandlePayload(
@@ -167,7 +176,9 @@ public CompletableFuture<PartHandle> putPart(
                       uploadIdString,
                       partNumber,
                       eTag,
-                      lengthInBytes)));
+                      lengthInBytes,
+                      checksumAlgorithm,
+                      checksum)));
         });
   }
 
@@ -203,8 +214,9 @@ public CompletableFuture<PathHandle> complete(
       payload.validate(uploadIdStr, filePath);
       ids.add(payload.getPartNumber());
       totalLength += payload.getLen();
-      eTags.add(
-          
CompletedPart.builder().partNumber(handle.getKey()).eTag(payload.getEtag()).build());
+      final UploadEtag uploadEtag = new UploadEtag(payload.getEtag(),
+          payload.getChecksumAlgorithm(), payload.getChecksum());
+      eTags.add(UploadEtag.toCompletedPart(uploadEtag, handle.getKey()));
     }
     Preconditions.checkArgument(ids.size() == count,
         "Duplicate PartHandles");
@@ -270,6 +282,8 @@ public CompletableFuture<Integer> 
abortUploadsUnderPath(final Path path)
    * @param partNumber part number from response
    * @param etag upload etag
    * @param len length
+   * @param checksumAlgorithm checksum algorithm
+   * @param checksum checksum content
    * @return a byte array to marshall.
    * @throws IOException error writing the payload
    */
@@ -279,10 +293,12 @@ static byte[] buildPartHandlePayload(
       final String uploadId,
       final int partNumber,
       final String etag,
-      final long len)
+      final long len,
+      final String checksumAlgorithm,
+      final String checksum)
       throws IOException {
 
-    return new PartHandlePayload(path, uploadId, partNumber, len, etag)
+    return new PartHandlePayload(path, uploadId, partNumber, len, etag, 
checksumAlgorithm, checksum)
         .toBytes();
   }
 
@@ -308,11 +324,34 @@ static PartHandlePayload parsePartHandlePayload(
       final int partNumber = input.readInt();
       final long len = input.readLong();
       final String etag = input.readUTF();
+      String checksumAlgorithm = null;
+      String checksum = null;
+      if (input.available() > 0) {
+        checksumAlgorithm = input.readUTF();
+        checksum = input.readUTF();
+      }
       if (len < 0) {
         throw new IOException("Negative length");
       }
-      return new PartHandlePayload(path, uploadId, partNumber, len, etag);
+      return new PartHandlePayload(path, uploadId, partNumber, len, etag, 
checksumAlgorithm,
+          checksum);
+    }
+  }
+
+  static Map.Entry<String, String> extractChecksum(final UploadPartResponse 
uploadPartResponse) {
+    if (uploadPartResponse.checksumCRC32() != null) {
+      return new AbstractMap.SimpleEntry<>("CRC32", 
uploadPartResponse.checksumCRC32());
+    }
+    if (uploadPartResponse.checksumCRC32C() != null) {
+      return new AbstractMap.SimpleEntry<>("CRC32C", 
uploadPartResponse.checksumCRC32C());
+    }
+    if (uploadPartResponse.checksumSHA1() != null) {
+      return new AbstractMap.SimpleEntry<>("SHA1", 
uploadPartResponse.checksumSHA1());
+    }
+    if (uploadPartResponse.checksumSHA256() != null) {
+      return new AbstractMap.SimpleEntry<>("SHA256", 
uploadPartResponse.checksumSHA256());
     }
+    return null;
   }
 
   /**
@@ -332,12 +371,18 @@ static final class PartHandlePayload {
 
     private final String etag;
 
+    private final String checksumAlgorithm;
+
+    private final String checksum;
+
     private PartHandlePayload(
         final String path,
         final String uploadId,
         final int partNumber,
         final long len,
-        final String etag) {
+        final String etag,
+        final String checksumAlgorithm,
+        final String checksum) {
       Preconditions.checkArgument(StringUtils.isNotEmpty(etag),
           "Empty etag");
       Preconditions.checkArgument(StringUtils.isNotEmpty(path),
@@ -346,12 +391,18 @@ private PartHandlePayload(
           "Empty uploadId");
       Preconditions.checkArgument(len >= 0,
           "Invalid length");
+      Preconditions.checkArgument((StringUtils.isNotEmpty(checksumAlgorithm) &&
+              StringUtils.isNotEmpty(checksum)) ||
+              (StringUtils.isEmpty(checksumAlgorithm) && 
StringUtils.isEmpty(checksum)),
+          "Checksum algorithm and checksum should be both provided or empty");
 
       this.path = path;
       this.uploadId = uploadId;
       this.partNumber = partNumber;
       this.len = len;
       this.etag = etag;
+      this.checksumAlgorithm = checksumAlgorithm;
+      this.checksum = checksum;
     }
 
     public String getPath() {
@@ -374,6 +425,14 @@ public String getUploadId() {
       return uploadId;
     }
 
+    public String getChecksumAlgorithm() {
+      return checksumAlgorithm;
+    }
+
+    public String getChecksum() {
+      return checksum;
+    }
+
     public byte[] toBytes()
         throws IOException {
       Preconditions.checkArgument(StringUtils.isNotEmpty(etag),
@@ -389,6 +448,10 @@ public byte[] toBytes()
         output.writeInt(partNumber);
         output.writeLong(len);
         output.writeUTF(etag);
+        if (checksumAlgorithm != null && checksum != null) {
+          output.writeUTF(checksumAlgorithm);
+          output.writeUTF(checksum);
+        }
       }
       return bytes.toByteArray();
     }
diff --git 
a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md 
b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index 59c1ceac1c5..afed3397f56 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -1307,6 +1307,15 @@ Here are some the S3A properties for use in production.
   </description>
 </property>
 
+<property>
+  <name>fs.s3a.create.checksum.algorithm</name>
+  <description>
+    Indicates the algorithm used to create the checksum for the object
+    to be uploaded to S3. Unset by default. It supports the following values:
+    'CRC32', 'CRC32C', 'SHA1', and 'SHA256'
+  </description>
+</property>
+
 <!--
 The switch to turn S3A auditing on or off.
 -->
diff --git 
a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
 
b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
index 1bdb2c66cb4..6520e0dc026 100644
--- 
a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
+++ 
b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
@@ -387,6 +387,28 @@ Happens if a multipart upload is being completed, but one 
of the parts is missin
 * A magic committer job's list of in-progress uploads somehow got corrupted
 * Bug in the S3A codebase (rare, but not impossible...)
 
+### <a name="object_lock_parameters"></a> Status Code 400 "Content-MD5 OR 
x-amz-checksum- HTTP header is required for Put Object requests with Object 
Lock parameters"
+```
+software.amazon.awssdk.services.s3.model.S3Exception: Content-MD5 OR 
x-amz-checksum- HTTP header is required for Put Object requests with Object 
Lock parameters (Service: S3, Status Code: 400, Request ID: 1122334455, 
Extended Request ID: ...):
+InvalidRequest: Content-MD5 OR x-amz-checksum- HTTP header is required for Put 
Object requests with Object Lock parameters (Service: S3, Status Code: 400, 
Request ID: 1122334455, Extended Request ID: ...)
+```
+
+This error happens if the S3 bucket has [Object 
Lock](https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lock.html) 
enabled.
+
+The Content-MD5 or x-amz-sdk-checksum-algorithm header is required for any 
request to upload an object
+with a retention period configured using Object Lock.
+
+If Object Lock can't be disabled in the S3 bucket, set a checksum algorithm to 
be used in the
+uploads via the `fs.s3a.create.checksum.algorithm` property. Note that 
enabling checksum on uploads can
+affect the performance.
+
+```xml
+<property>
+  <name>fs.s3a.create.checksum.algorithm</name>
+  <value>SHA256</value>
+</property>
+```
+
 ## <a name="access_denied"></a> Access Denied
 
 HTTP error codes 401 and 403 are mapped to `AccessDeniedException` in the S3A 
connector.
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AChecksum.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AChecksum.java
new file mode 100644
index 00000000000..f477f46ceb6
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AChecksum.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
+import software.amazon.awssdk.services.s3.model.ChecksumMode;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.ChecksumSupport;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.rm;
+import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_ALGORITHM;
+import static 
org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REJECT_OUT_OF_SPAN_OPERATIONS;
+
+/**
+ * Tests S3 checksum algorithm.
+ * If CHECKSUM_ALGORITHM config is not set in auth-keys.xml,
+ * SHA256 algorithm will be picked.
+ */
+public class ITestS3AChecksum extends AbstractS3ATestBase {
+
+  private static final ChecksumAlgorithm DEFAULT_CHECKSUM_ALGORITHM = 
ChecksumAlgorithm.SHA256;
+
+  private ChecksumAlgorithm checksumAlgorithm;
+
+  private static final int[] SIZES = {
+      1, 2, 3, 4, 5, 254, 255, 256, 257, 2 ^ 12 - 1
+  };
+
+  @Override
+  protected Configuration createConfiguration() {
+    final Configuration conf = super.createConfiguration();
+    S3ATestUtils.removeBaseAndBucketOverrides(conf,
+        CHECKSUM_ALGORITHM,
+        REJECT_OUT_OF_SPAN_OPERATIONS);
+    S3ATestUtils.disableFilesystemCaching(conf);
+    checksumAlgorithm = ChecksumSupport.getChecksumAlgorithm(conf);
+    if (checksumAlgorithm == null) {
+      checksumAlgorithm = DEFAULT_CHECKSUM_ALGORITHM;
+      LOG.info("No checksum algorithm found in configuration, will use default 
{}",
+          checksumAlgorithm);
+      conf.set(CHECKSUM_ALGORITHM, checksumAlgorithm.toString());
+    }
+    conf.setBoolean(REJECT_OUT_OF_SPAN_OPERATIONS, false);
+    return conf;
+  }
+
+  @Test
+  public void testChecksum() throws IOException {
+    for (int size : SIZES) {
+      validateChecksumForFilesize(size);
+    }
+  }
+
+  private void validateChecksumForFilesize(int len) throws IOException {
+    describe("Create a file of size " + len);
+    String src = String.format("%s-%04x", methodName.getMethodName(), len);
+    Path path = writeThenReadFile(src, len);
+    assertChecksum(path);
+    rm(getFileSystem(), path, false, false);
+  }
+
+  private void assertChecksum(Path path) throws IOException {
+    final String key = getFileSystem().pathToKey(path);
+    HeadObjectRequest.Builder requestBuilder = 
getFileSystem().getRequestFactory()
+        .newHeadObjectRequestBuilder(key)
+        .checksumMode(ChecksumMode.ENABLED);
+    HeadObjectResponse headObject = getFileSystem().getS3AInternals()
+        .getAmazonS3Client("Call head object with checksum enabled")
+        .headObject(requestBuilder.build());
+    switch (checksumAlgorithm) {
+    case CRC32:
+      Assertions.assertThat(headObject.checksumCRC32())
+          .describedAs("headObject.checksumCRC32()")
+          .isNotNull();
+      break;
+    case CRC32_C:
+      Assertions.assertThat(headObject.checksumCRC32C())
+          .describedAs("headObject.checksumCRC32C()")
+          .isNotNull();
+      break;
+    case SHA1:
+      Assertions.assertThat(headObject.checksumSHA1())
+          .describedAs("headObject.checksumSHA1()")
+          .isNotNull();
+      break;
+    case SHA256:
+      Assertions.assertThat(headObject.checksumSHA256())
+          .describedAs("headObject.checksumSHA256()")
+          .isNotNull();
+      break;
+    default:
+      fail("Checksum algorithm not supported: " + checksumAlgorithm);
+    }
+  }
+
+}
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java
index 28a443f04cd..5b8e1dc4300 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java
@@ -19,7 +19,7 @@
 package org.apache.hadoop.fs.s3a;
 
 import static org.apache.hadoop.fs.s3a.Constants.FS_S3A;
-import static org.junit.Assert.assertEquals;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.when;
 
@@ -27,6 +27,7 @@
 import java.net.URI;
 import java.util.Date;
 
+import org.assertj.core.api.Assertions;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
 import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
@@ -86,7 +87,20 @@ public void testDeleteOnExit() throws Exception {
 
     testFs.deleteOnExit(path);
     testFs.close();
-    assertEquals(0, testFs.getDeleteOnDnExitCount());
+    Assertions.assertThat(testFs.getDeleteOnDnExitCount()).isEqualTo(0);
+  }
+
+  @Test
+  public void testCreateRequestFactoryWithInvalidChecksumAlgorithm() throws 
Exception {
+    Configuration conf = createConfiguration();
+    conf.set(Constants.CHECKSUM_ALGORITHM, "INVALID");
+    TestS3AFileSystem testFs  = new TestS3AFileSystem();
+    URI uri = URI.create(FS_S3A + "://" + BUCKET);
+    final IllegalArgumentException exception = 
intercept(IllegalArgumentException.class,
+        () -> testFs.initialize(uri, conf));
+    Assertions.assertThat(exception.getMessage())
+        .describedAs("Error message should say that INVALID is not supported")
+        .isEqualTo("Checksum algorithm is not supported: INVALID");
   }
 
   private ArgumentMatcher<HeadObjectRequest> correctGetMetadataRequest(
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java
index 58bb2a5e491..e6eb60ad061 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java
@@ -217,6 +217,11 @@ private Configuration createTestConfig(String identifier) {
     conf.set(TEST_ID_KEY, identifier);
     conf.set(TEST_REGION_KEY, regionName);
 
+    // Having the checksum algorithm in this test causes
+    // x-amz-sdk-checksum-algorithm specified, but no corresponding
+    // x-amz-checksum-* or x-amz-trailer headers were found
+    conf.unset(Constants.CHECKSUM_ALGORITHM);
+
     // make absolutely sure there is no caching.
     disableFilesystemCaching(conf);
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
index 9e0bdd2cd34..34b856c21b7 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
@@ -40,6 +40,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.auth.ProgressCounter;
+import org.apache.hadoop.fs.s3a.commit.files.UploadEtag;
 import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
 import org.apache.hadoop.fs.s3a.commit.impl.CommitContext;
 import org.apache.hadoop.fs.s3a.commit.impl.CommitOperations;
@@ -442,11 +443,11 @@ private Path validatePendingCommitData(String filename,
     Assertions.assertThat(persisted.getSaved())
         .describedAs("saved timestamp in %s", persisted)
         .isGreaterThan(0);
-    List<String> etags = persisted.getEtags();
-    Assertions.assertThat(etags)
+    List<UploadEtag> uploadEtags = persisted.getEtags();
+    Assertions.assertThat(uploadEtags)
         .describedAs("Etag list")
         .hasSize(1);
-    Assertions.assertThat(CommitOperations.toPartEtags(etags))
+    Assertions.assertThat(uploadEtags)
         .describedAs("Etags to parts")
         .hasSize(1);
     return pendingDataPath;
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/files/TestUploadEtag.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/files/TestUploadEtag.java
new file mode 100644
index 00000000000..e0d19591c40
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/files/TestUploadEtag.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.files;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
+
+public class TestUploadEtag {
+
+  @Test
+  public void testFromCompletedPartCRC32() {
+    final CompletedPart completedPart = CompletedPart.builder()
+        .eTag("tag")
+        .checksumCRC32("checksum")
+        .build();
+    final UploadEtag uploadEtag = UploadEtag.fromCompletedPart(completedPart);
+    Assertions.assertThat(uploadEtag.getEtag())
+        .describedAs("Etag mismatch")
+        .isEqualTo("tag");
+    Assertions.assertThat(uploadEtag.getChecksumAlgorithm())
+        .describedAs("Checksum algorithm should be CRC32")
+        .isEqualTo("CRC32");
+    Assertions.assertThat(uploadEtag.getChecksum())
+        .describedAs("Checksum mismatch")
+        .isEqualTo("checksum");
+  }
+
+  @Test
+  public void testFromCompletedPartCRC32C() {
+    final CompletedPart completedPart = CompletedPart.builder()
+        .eTag("tag")
+        .checksumCRC32C("checksum")
+        .build();
+    final UploadEtag uploadEtag = UploadEtag.fromCompletedPart(completedPart);
+    Assertions.assertThat(uploadEtag.getEtag())
+        .describedAs("Etag mismatch")
+        .isEqualTo("tag");
+    Assertions.assertThat(uploadEtag.getChecksumAlgorithm())
+        .describedAs("Checksum algorithm should be CRC32C")
+        .isEqualTo("CRC32C");
+    Assertions.assertThat(uploadEtag.getChecksum())
+        .describedAs("Checksum mismatch")
+        .isEqualTo("checksum");
+  }
+
+  @Test
+  public void testFromCompletedPartSHA1() {
+    final CompletedPart completedPart = CompletedPart.builder()
+        .eTag("tag")
+        .checksumSHA1("checksum")
+        .build();
+    final UploadEtag uploadEtag = UploadEtag.fromCompletedPart(completedPart);
+    Assertions.assertThat(uploadEtag.getEtag())
+        .describedAs("Etag mismatch")
+        .isEqualTo("tag");
+    Assertions.assertThat(uploadEtag.getChecksumAlgorithm())
+        .describedAs("Checksum algorithm should be SHA1")
+        .isEqualTo("SHA1");
+    Assertions.assertThat(uploadEtag.getChecksum())
+        .describedAs("Checksum mismatch")
+        .isEqualTo("checksum");
+  }
+
+  @Test
+  public void testFromCompletedPartSHA256() {
+    final CompletedPart completedPart = CompletedPart.builder()
+        .eTag("tag")
+        .checksumSHA256("checksum")
+        .build();
+    final UploadEtag uploadEtag = UploadEtag.fromCompletedPart(completedPart);
+    Assertions.assertThat(uploadEtag.getEtag())
+        .describedAs("Etag mismatch")
+        .isEqualTo("tag");
+    Assertions.assertThat(uploadEtag.getChecksumAlgorithm())
+        .describedAs("Checksum algorithm should be SHA256")
+        .isEqualTo("SHA256");
+    Assertions.assertThat(uploadEtag.getChecksum())
+        .describedAs("Checksum mismatch")
+        .isEqualTo("checksum");
+  }
+
+  @Test
+  public void testFromCompletedPartNoChecksum() {
+    final CompletedPart completedPart = CompletedPart.builder()
+        .eTag("tag")
+        .build();
+    final UploadEtag uploadEtag = UploadEtag.fromCompletedPart(completedPart);
+    Assertions.assertThat(uploadEtag.getEtag())
+        .describedAs("Etag mismatch")
+        .isEqualTo("tag");
+    Assertions.assertThat(uploadEtag.getChecksumAlgorithm())
+        .describedAs("uploadEtag.getChecksumAlgorithm()")
+        .isNull();
+    Assertions.assertThat(uploadEtag.getChecksum())
+        .describedAs("uploadEtag.getChecksum()")
+        .isNull();
+  }
+
+  @Test
+  public void testToCompletedPartCRC32() {
+    final UploadEtag uploadEtag = new UploadEtag("tag", "CRC32", "checksum");
+    final CompletedPart completedPart = UploadEtag.toCompletedPart(uploadEtag, 
1);
+    Assertions.assertThat(completedPart.checksumCRC32())
+        .describedAs("Checksum mismatch")
+        .isEqualTo("checksum");
+  }
+
+  @Test
+  public void testToCompletedPartCRC32C() {
+    final UploadEtag uploadEtag = new UploadEtag("tag", "CRC32C", "checksum");
+    final CompletedPart completedPart = UploadEtag.toCompletedPart(uploadEtag, 
1);
+    Assertions.assertThat(completedPart.checksumCRC32C())
+        .describedAs("Checksum mismatch")
+        .isEqualTo("checksum");
+  }
+
+  @Test
+  public void testToCompletedPartSHA1() {
+    final UploadEtag uploadEtag = new UploadEtag("tag", "SHA1", "checksum");
+    final CompletedPart completedPart = UploadEtag.toCompletedPart(uploadEtag, 
1);
+    Assertions.assertThat(completedPart.checksumSHA1())
+        .describedAs("Checksum mismatch")
+        .isEqualTo("checksum");
+  }
+
+  @Test
+  public void testToCompletedPartSHA256() {
+    final UploadEtag uploadEtag = new UploadEtag("tag", "SHA256", "checksum");
+    final CompletedPart completedPart = UploadEtag.toCompletedPart(uploadEtag, 
1);
+    Assertions.assertThat(completedPart.checksumSHA256())
+        .describedAs("Checksum mismatch")
+        .isEqualTo("checksum");
+  }
+
+  @Test
+  public void testToCompletedPartNoChecksum() {
+    final UploadEtag uploadEtag = new UploadEtag("tag", null, null);
+    final CompletedPart completedPart = UploadEtag.toCompletedPart(uploadEtag, 
1);
+    Assertions.assertThat(completedPart.checksumCRC32())
+        .describedAs("completedPart.checksumCRC32()")
+        .isNull();
+    Assertions.assertThat(completedPart.checksumCRC32C())
+        .describedAs("completedPart.checksumCRC32C()")
+        .isNull();
+    Assertions.assertThat(completedPart.checksumSHA1())
+        .describedAs("completedPart.checksumSHA1()")
+        .isNull();
+    Assertions.assertThat(completedPart.checksumSHA256())
+        .describedAs("completedPart.checksumSHA256()")
+        .isNull();
+  }
+}
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java
index 28bd8b878e5..63b79c230fd 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java
@@ -780,7 +780,7 @@ private static void assertValidUpload(Map<String, 
List<String>> parts,
 
     for (int i = 0; i < tags.size(); i += 1) {
       assertEquals("Should commit the correct part tags",
-          tags.get(i), commit.getEtags().get(i));
+          tags.get(i), commit.getEtags().get(i).getEtag());
     }
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java
index 28161979f0b..1113b54b247 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java
@@ -32,6 +32,7 @@
 import org.apache.hadoop.fs.s3a.MockS3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.commit.PathCommitException;
+import org.apache.hadoop.fs.s3a.commit.files.UploadEtag;
 import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
 import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
 import org.apache.hadoop.fs.s3a.commit.impl.CommitContext;
@@ -100,9 +101,9 @@ protected ActiveCommit listPendingUploadsToCommit(
           commit.setDestinationKey(key);
           commit.setUri("s3a://" + BUCKET + "/" + key);
           commit.setUploadId(uploadId);
-          ArrayList<String> etags = new ArrayList<>();
-          etags.add("tag1");
-          commit.setEtags(etags);
+          ArrayList<UploadEtag> uploadEtags = new ArrayList<>();
+          uploadEtags.add(new UploadEtag("tag1", null, null));
+          commit.setEtags(uploadEtags);
           pendingSet.add(commit);
           // register the upload so commit operations are not rejected
           getMockResults().addUpload(uploadId, key);
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestChecksumSupport.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestChecksumSupport.java
new file mode 100644
index 00000000000..5e8ee5d8c76
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestChecksumSupport.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
+
+import org.apache.hadoop.conf.Configuration;
+
+import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_ALGORITHM;
+
+public class TestChecksumSupport {
+
+  @Test
+  public void testGetSupportedChecksumAlgorithmCRC32() {
+    testGetSupportedChecksumAlgorithm(ChecksumAlgorithm.CRC32);
+  }
+
+  @Test
+  public void testGetSupportedChecksumAlgorithmCRC32C() {
+    testGetSupportedChecksumAlgorithm(ChecksumAlgorithm.CRC32_C);
+  }
+
+  @Test
+  public void testGetSupportedChecksumAlgorithmSHA1() {
+    testGetSupportedChecksumAlgorithm(ChecksumAlgorithm.SHA1);
+  }
+
+  @Test
+  public void testGetSupportedChecksumAlgorithmSHA256() {
+    testGetSupportedChecksumAlgorithm(ChecksumAlgorithm.SHA256);
+  }
+
+  private void testGetSupportedChecksumAlgorithm(ChecksumAlgorithm 
checksumAlgorithm) {
+    final Configuration conf = new Configuration();
+    conf.set(CHECKSUM_ALGORITHM, checksumAlgorithm.toString());
+    Assertions.assertThat(ChecksumSupport.getChecksumAlgorithm(conf))
+        .describedAs("Checksum algorithm must match value set in the 
configuration")
+        .isEqualTo(checksumAlgorithm);
+  }
+
+  @Test
+  public void testGetChecksumAlgorithmWhenNull() {
+    final Configuration conf = new Configuration();
+    conf.unset(CHECKSUM_ALGORITHM);
+    Assertions.assertThat(ChecksumSupport.getChecksumAlgorithm(conf))
+        .describedAs("If configuration is not set, checksum algorithm must be 
null")
+        .isNull();
+  }
+
+  @Test
+  public void testGetNotSupportedChecksumAlgorithm() {
+    final Configuration conf = new Configuration();
+    conf.set(CHECKSUM_ALGORITHM, "INVALID");
+    Assertions.assertThatThrownBy(() -> 
ChecksumSupport.getChecksumAlgorithm(conf))
+        .describedAs("Invalid checksum algorithm should throw an exception")
+        .isInstanceOf(IllegalArgumentException.class);
+  }
+}
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java
index c779062f518..2285ed06c2b 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java
@@ -19,22 +19,29 @@
 package org.apache.hadoop.fs.s3a.impl;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Base64;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.junit.Test;
 import software.amazon.awssdk.awscore.AwsRequest;
 import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
 import software.amazon.awssdk.core.SdkRequest;
+import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
 import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
 import org.assertj.core.api.Assertions;
-import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.services.s3.model.PutObjectRequest;
 import software.amazon.awssdk.services.s3.model.S3Request;
+import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
 import software.amazon.awssdk.services.s3.model.UploadPartRequest;
-
+import software.amazon.awssdk.utils.Md5Utils;
 
 import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
@@ -275,4 +282,77 @@ public void testUploadTimeouts() throws Throwable {
     assertApiTimeouts(partDuration, upload);
 
   }
+
+  @Test
+  public void testRequestFactoryWithChecksumAlgorithmCRC32() throws 
IOException {
+    testRequestFactoryWithChecksumAlgorithm(ChecksumAlgorithm.CRC32);
+  }
+
+  @Test
+  public void testRequestFactoryWithChecksumAlgorithmCRC32C() throws 
IOException {
+    testRequestFactoryWithChecksumAlgorithm(ChecksumAlgorithm.CRC32_C);
+  }
+
+  @Test
+  public void testRequestFactoryWithChecksumAlgorithmSHA1() throws IOException 
{
+    testRequestFactoryWithChecksumAlgorithm(ChecksumAlgorithm.SHA1);
+  }
+
+  @Test
+  public void testRequestFactoryWithChecksumAlgorithmSHA256() throws 
IOException {
+    testRequestFactoryWithChecksumAlgorithm(ChecksumAlgorithm.SHA256);
+  }
+
+  private void testRequestFactoryWithChecksumAlgorithm(ChecksumAlgorithm 
checksumAlgorithm)
+      throws IOException {
+    String path = "path";
+    String path2 = "path2";
+    HeadObjectResponse md = 
HeadObjectResponse.builder().contentLength(128L).build();
+
+    RequestFactory factory = RequestFactoryImpl.builder()
+        .withBucket("bucket")
+        .withChecksumAlgorithm(checksumAlgorithm)
+        .build();
+    createFactoryObjects(factory);
+
+    final CopyObjectRequest copyObjectRequest = 
factory.newCopyObjectRequestBuilder(path,
+            path2, md).build();
+    
Assertions.assertThat(copyObjectRequest.checksumAlgorithm()).isEqualTo(checksumAlgorithm);
+
+    final PutObjectRequest putObjectRequest = 
factory.newPutObjectRequestBuilder(path,
+        PutObjectOptions.keepingDirs(), 1024, false).build();
+    
Assertions.assertThat(putObjectRequest.checksumAlgorithm()).isEqualTo(checksumAlgorithm);
+
+    final CreateMultipartUploadRequest multipartUploadRequest =
+        factory.newMultipartUploadRequestBuilder(path, null).build();
+    
Assertions.assertThat(multipartUploadRequest.checksumAlgorithm()).isEqualTo(checksumAlgorithm);
+
+    final UploadPartRequest uploadPartRequest = 
factory.newUploadPartRequestBuilder(path,
+        "id", 2, true, 128_000_000).build();
+    
Assertions.assertThat(uploadPartRequest.checksumAlgorithm()).isEqualTo(checksumAlgorithm);
+  }
+
+  @Test
+  public void testCompleteMultipartUploadRequestWithChecksumAlgorithmAndSSEC() 
throws IOException {
+    final byte[] encryptionKey = 
"encryptionKey".getBytes(StandardCharsets.UTF_8);
+    final String encryptionKeyBase64 = Base64.getEncoder()
+        .encodeToString(encryptionKey);
+    final String encryptionKeyMd5 = Md5Utils.md5AsBase64(encryptionKey);
+    final EncryptionSecrets encryptionSecrets = new 
EncryptionSecrets(S3AEncryptionMethods.SSE_C,
+        encryptionKeyBase64, null);
+    RequestFactory factory = RequestFactoryImpl.builder()
+        .withBucket("bucket")
+        .withChecksumAlgorithm(ChecksumAlgorithm.CRC32_C)
+        .withEncryptionSecrets(encryptionSecrets)
+        .build();
+    createFactoryObjects(factory);
+
+    final CompleteMultipartUploadRequest request =
+        factory.newCompleteMultipartUploadRequestBuilder("path", "1", new 
ArrayList<>())
+            .build();
+    Assertions.assertThat(request.sseCustomerAlgorithm())
+        .isEqualTo(ServerSideEncryption.AES256.name());
+    
Assertions.assertThat(request.sseCustomerKey()).isEqualTo(encryptionKeyBase64);
+    
Assertions.assertThat(request.sseCustomerKeyMD5()).isEqualTo(encryptionKeyMd5);
+  }
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestS3AMultipartUploaderSupport.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestS3AMultipartUploaderSupport.java
index 71305aa6633..afdad31440a 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestS3AMultipartUploaderSupport.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestS3AMultipartUploaderSupport.java
@@ -20,14 +20,18 @@
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.util.Map;
 
+import org.assertj.core.api.Assertions;
 import org.junit.Test;
+import software.amazon.awssdk.services.s3.model.UploadPartResponse;
 
 import org.apache.hadoop.test.HadoopTestBase;
 
 import static 
org.apache.hadoop.fs.s3a.impl.S3AMultipartUploader.PartHandlePayload;
 import static 
org.apache.hadoop.fs.s3a.impl.S3AMultipartUploader.buildPartHandlePayload;
 import static 
org.apache.hadoop.fs.s3a.impl.S3AMultipartUploader.parsePartHandlePayload;
+import static 
org.apache.hadoop.fs.s3a.impl.S3AMultipartUploader.extractChecksum;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 /**
@@ -41,35 +45,60 @@ public class TestS3AMultipartUploaderSupport extends 
HadoopTestBase {
 
   @Test
   public void testRoundTrip() throws Throwable {
-    PartHandlePayload result = roundTrip(999, "tag", 1);
+    PartHandlePayload result = roundTrip(999, "tag", 1, null, null);
     assertEquals(PATH, result.getPath());
     assertEquals(UPLOAD, result.getUploadId());
     assertEquals(999, result.getPartNumber());
     assertEquals("tag", result.getEtag());
     assertEquals(1, result.getLen());
+    Assertions.assertThat(result.getChecksumAlgorithm())
+        .describedAs("Checksum algorithm must not be present").isNull();
+    Assertions.assertThat(result.getChecksum())
+        .describedAs("Checksum must not be generated").isNull();
   }
 
   @Test
   public void testRoundTrip2() throws Throwable {
     long len = 1L + Integer.MAX_VALUE;
     PartHandlePayload result =
-        roundTrip(1, "11223344", len);
+        roundTrip(1, "11223344", len, null, null);
     assertEquals(1, result.getPartNumber());
     assertEquals("11223344", result.getEtag());
     assertEquals(len, result.getLen());
+    Assertions.assertThat(result.getChecksumAlgorithm())
+        .describedAs("Checksum algorithm must not be present").isNull();
+    Assertions.assertThat(result.getChecksum())
+        .describedAs("Checksum must not be generated").isNull();
+  }
+
+  @Test
+  public void testRoundTripWithChecksum() throws Throwable {
+    PartHandlePayload result = roundTrip(999, "tag", 1,
+        "SHA256", "checksum");
+    assertEquals(PATH, result.getPath());
+    assertEquals(UPLOAD, result.getUploadId());
+    assertEquals(999, result.getPartNumber());
+    assertEquals("tag", result.getEtag());
+    assertEquals(1, result.getLen());
+    Assertions.assertThat(result.getChecksumAlgorithm())
+        .describedAs("Expect the checksum algorithm to be SHA256")
+        .isEqualTo("SHA256");
+    Assertions.assertThat(result.getChecksum())
+        .describedAs("Checksum must be set")
+        .isEqualTo("checksum");
   }
 
   @Test
   public void testNoEtag() throws Throwable {
     intercept(IllegalArgumentException.class,
         () -> buildPartHandlePayload(PATH, UPLOAD,
-            0, "", 1));
+            0, "", 1, null, null));
   }
 
   @Test
   public void testNoLen() throws Throwable {
     intercept(IllegalArgumentException.class,
-        () -> buildPartHandlePayload(PATH, UPLOAD, 0, "tag", -1));
+        () -> buildPartHandlePayload(PATH, UPLOAD, 0, "tag", -1, null, null));
   }
 
   @Test
@@ -80,17 +109,97 @@ public void testBadPayload() throws Throwable {
 
   @Test
   public void testBadHeader() throws Throwable {
-    byte[] bytes = buildPartHandlePayload(PATH, UPLOAD, 0, "tag", 1);
+    byte[] bytes = buildPartHandlePayload(PATH, UPLOAD, 0, "tag", 1, null, 
null);
     bytes[2] = 'f';
     intercept(IOException.class, "header",
         () -> parsePartHandlePayload(bytes));
   }
 
+  @Test
+  public void testNoChecksumAlgorithm() throws Exception {
+    intercept(IllegalArgumentException.class,
+        () -> buildPartHandlePayload(PATH, UPLOAD,
+            999, "tag", 1, "", "checksum"));
+  }
+
+  @Test
+  public void testNoChecksum() throws Exception {
+    intercept(IllegalArgumentException.class,
+        () -> buildPartHandlePayload(PATH, UPLOAD,
+            999, "tag", 1, "SHA256", ""));
+  }
+
+  @Test
+  public void testExtractChecksumCRC32() {
+    final UploadPartResponse uploadPartResponse = UploadPartResponse.builder()
+        .checksumCRC32("checksum")
+        .build();
+    final Map.Entry<String, String> checksum = 
extractChecksum(uploadPartResponse);
+    Assertions.assertThat(checksum.getKey())
+        .describedAs("Expect the checksum algorithm to be CRC32")
+        .isEqualTo("CRC32");
+    Assertions.assertThat(checksum.getValue())
+        .describedAs("Checksum must be set")
+        .isEqualTo("checksum");
+  }
+
+  @Test
+  public void testExtractChecksumCRC32C() {
+    final UploadPartResponse uploadPartResponse = UploadPartResponse.builder()
+        .checksumCRC32C("checksum")
+        .build();
+    final Map.Entry<String, String> checksum = 
extractChecksum(uploadPartResponse);
+    Assertions.assertThat(checksum.getKey())
+        .describedAs("Expect the checksum algorithm to be CRC32C")
+        .isEqualTo("CRC32C");
+    Assertions.assertThat(checksum.getValue())
+        .describedAs("Checksum must be set")
+        .isEqualTo("checksum");
+  }
+
+  @Test
+  public void testExtractChecksumSHA1() {
+    final UploadPartResponse uploadPartResponse = UploadPartResponse.builder()
+        .checksumSHA1("checksum")
+        .build();
+    final Map.Entry<String, String> checksum = 
extractChecksum(uploadPartResponse);
+    Assertions.assertThat(checksum.getKey())
+        .describedAs("Expect the checksum algorithm to be SHA1")
+        .isEqualTo("SHA1");
+    Assertions.assertThat(checksum.getValue())
+        .describedAs("Checksum must be set")
+        .isEqualTo("checksum");
+  }
+
+  @Test
+  public void testExtractChecksumSHA256() {
+    final UploadPartResponse uploadPartResponse = UploadPartResponse.builder()
+        .checksumSHA256("checksum")
+        .build();
+    final Map.Entry<String, String> checksum = 
extractChecksum(uploadPartResponse);
+    Assertions.assertThat(checksum.getKey())
+        .describedAs("Expect the checksum algorithm to be SHA256")
+        .isEqualTo("SHA256");
+    Assertions.assertThat(checksum.getValue())
+        .describedAs("Checksum must be set")
+        .isEqualTo("checksum");
+  }
+
+  @Test
+  public void testExtractChecksumEmpty() {
+    final UploadPartResponse uploadPartResponse = 
UploadPartResponse.builder().build();
+    final Map.Entry<String, String> checksum = 
extractChecksum(uploadPartResponse);
+    assertNull(checksum);
+  }
+
   private PartHandlePayload roundTrip(
       int partNumber,
       String tag,
-      long len) throws IOException {
-    byte[] bytes = buildPartHandlePayload(PATH, UPLOAD, partNumber, tag, len);
+      long len,
+      String checksumAlgorithm,
+      String checksum) throws IOException {
+    byte[] bytes = buildPartHandlePayload(PATH, UPLOAD, partNumber, tag, len,
+        checksumAlgorithm, checksum);
     return parsePartHandlePayload(bytes);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to