This is an automated email from the ASF dual-hosted git repository.
ivandika pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 14e4e1f1cd7 HDDS-13668. Support S3 signed single chunk payload
verification (#9294)
14e4e1f1cd7 is described below
commit 14e4e1f1cd7ad51e279ee672862b1790c2b6dc4d
Author: Han-Wen Hsu <[email protected]>
AuthorDate: Fri Jan 2 13:23:37 2026 +0800
HDDS-13668. Support S3 signed single chunk payload verification (#9294)
---
.../hadoop/ozone/client/io/ECKeyOutputStream.java | 13 ++
.../ozone/client/io/KeyDataStreamOutput.java | 12 +
.../hadoop/ozone/client/io/KeyOutputStream.java | 10 +
.../src/main/smoketest/s3/presigned_url_helper.py | 73 +++++++
.../dist/src/main/smoketest/s3/presignedurl.robot | 54 +++++
hadoop-ozone/integration-test-s3/pom.xml | 5 +
.../ozone/s3/awssdk/v1/AbstractS3SDKV1Tests.java | 35 +++
.../ozone/s3/awssdk/v2/AbstractS3SDKV2Tests.java | 36 +++
.../hadoop/ozone/s3/MultiDigestInputStream.java | 243 +++++++++++++++++++++
.../hadoop/ozone/s3/endpoint/ObjectEndpoint.java | 86 ++++++--
.../ozone/s3/endpoint/ObjectEndpointStreaming.java | 39 +++-
.../hadoop/ozone/s3/exception/OS3Exception.java | 2 +-
.../hadoop/ozone/s3/exception/S3ErrorTable.java | 4 +
.../ozone/s3/signature/StringToSignProducer.java | 2 +-
.../ozone/s3/TestMultiDigestInputStream.java | 196 +++++++++++++++++
.../hadoop/ozone/s3/endpoint/TestObjectGet.java | 2 +-
.../hadoop/ozone/s3/endpoint/TestObjectPut.java | 2 +-
.../ozone/s3/endpoint/TestObjectTaggingDelete.java | 2 +-
.../ozone/s3/endpoint/TestObjectTaggingGet.java | 2 +-
.../ozone/s3/endpoint/TestObjectTaggingPut.java | 2 +-
.../hadoop/ozone/s3/endpoint/TestPartUpload.java | 5 +
.../ozone/s3/endpoint/TestUploadWithStream.java | 2 +-
.../ozone/s3/metrics/TestS3GatewayMetrics.java | 3 +-
23 files changed, 790 insertions(+), 40 deletions(-)
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index a3b6e128132..ee5c7548757 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -19,10 +19,12 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import jakarta.annotation.Nonnull;
import java.io.IOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@@ -47,6 +49,7 @@
import org.apache.ozone.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.ozone.erasurecode.rawcoder.util.CodecUtil;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.function.CheckedRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,6 +89,13 @@ public final class ECKeyOutputStream extends KeyOutputStream
// how much data has been ingested into the stream
private long writeOffset;
+ private List<CheckedRunnable<IOException>> preCommits =
Collections.emptyList();
+
+ @Override
+ public void setPreCommits(@Nonnull List<CheckedRunnable<IOException>>
preCommits) {
+ this.preCommits = preCommits;
+ }
+
@VisibleForTesting
public void insertFlushCheckpoint(long version) throws IOException {
addStripeToQueue(new CheckpointDummyStripe(version));
@@ -485,6 +495,9 @@ public void close() throws IOException {
"Expected: %d and actual %d write sizes do not match",
expectedSize, offset));
}
+ for (CheckedRunnable<IOException> preCommit : preCommits) {
+ preCommit.run();
+ }
blockOutputStreamEntryPool.commitKey(offset);
}
} catch (ExecutionException e) {
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
index f9a47a9f55e..fffe6e6e81d 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
@@ -19,9 +19,11 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import jakarta.annotation.Nonnull;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -44,6 +46,7 @@
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.ratis.util.function.CheckedRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,6 +85,12 @@ public class KeyDataStreamOutput extends
AbstractDataStreamOutput
*/
private boolean atomicKeyCreation;
+ private List<CheckedRunnable<IOException>> preCommits =
Collections.emptyList();
+
+ public void setPreCommits(@Nonnull List<CheckedRunnable<IOException>>
preCommits) {
+ this.preCommits = preCommits;
+ }
+
@VisibleForTesting
public List<BlockDataStreamOutputEntry> getStreamEntries() {
return blockDataStreamOutputEntryPool.getStreamEntries();
@@ -431,6 +440,9 @@ public void close() throws IOException {
String.format("Expected: %d and actual %d write sizes do not
match",
expectedSize, offset));
}
+ for (CheckedRunnable<IOException> preCommit : preCommits) {
+ preCommit.run();
+ }
blockDataStreamOutputEntryPool.commitKey(offset);
} finally {
blockDataStreamOutputEntryPool.cleanup();
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 778ac7e2f4f..2f9edfa94ea 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -19,10 +19,12 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import jakarta.annotation.Nonnull;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -110,6 +112,11 @@ public class KeyOutputStream extends OutputStream
private final int maxConcurrentWritePerKey;
private final KeyOutputStreamSemaphore keyOutputStreamSemaphore;
+ private List<CheckedRunnable<IOException>> preCommits =
Collections.emptyList();
+
+ public void setPreCommits(@Nonnull List<CheckedRunnable<IOException>>
preCommits) {
+ this.preCommits = preCommits;
+ }
@VisibleForTesting
KeyOutputStreamSemaphore getRequestSemaphore() {
@@ -655,6 +662,9 @@ private void closeInternal() throws IOException {
String.format("Expected: %d and actual %d write sizes do not
match",
expectedSize, offset));
}
+ for (CheckedRunnable<IOException> preCommit : preCommits) {
+ preCommit.run();
+ }
blockOutputStreamEntryPool.commitKey(offset);
} finally {
blockOutputStreamEntryPool.cleanup();
diff --git a/hadoop-ozone/dist/src/main/smoketest/s3/presigned_url_helper.py
b/hadoop-ozone/dist/src/main/smoketest/s3/presigned_url_helper.py
new file mode 100644
index 00000000000..8b5cef974f5
--- /dev/null
+++ b/hadoop-ozone/dist/src/main/smoketest/s3/presigned_url_helper.py
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import hashlib
+
+
+def generate_presigned_put_object_url(
+ aws_access_key_id=None,
+ aws_secret_access_key=None,
+ bucket_name=None,
+ object_key=None,
+ region_name='us-east-1',
+ expiration=3600,
+ content_type=None,
+ endpoint_url=None,
+):
+ """
+ Generate a presigned URL for PUT Object. This function creates the S3 client
internally.
+ """
+ try:
+ import boto3
+
+ client_args = {
+ 'service_name': 's3',
+ 'region_name': region_name,
+ }
+
+ if aws_access_key_id and aws_secret_access_key:
+ client_args['aws_access_key_id'] = aws_access_key_id
+ client_args['aws_secret_access_key'] = aws_secret_access_key
+
+ if endpoint_url:
+ client_args['endpoint_url'] = endpoint_url
+
+ s3_client = boto3.client(**client_args)
+
+ params = {
+ 'Bucket': bucket_name,
+ 'Key': object_key,
+ }
+
+ if content_type:
+ params['ContentType'] = content_type
+
+ presigned_url = s3_client.generate_presigned_url(
+ ClientMethod='put_object',
+ Params=params,
+ ExpiresIn=expiration
+ )
+
+ return presigned_url
+
+ except Exception as e:
+ raise Exception(f"Failed to generate presigned URL: {str(e)}")
+
+
+def compute_sha256_file(path):
+ """Compute SHA256 hex digest for the entire file content at path."""
+ with open(path, 'rb') as f:
+ return hashlib.sha256(f.read()).hexdigest()
diff --git a/hadoop-ozone/dist/src/main/smoketest/s3/presignedurl.robot
b/hadoop-ozone/dist/src/main/smoketest/s3/presignedurl.robot
new file mode 100644
index 00000000000..8cc4ff6cbf7
--- /dev/null
+++ b/hadoop-ozone/dist/src/main/smoketest/s3/presignedurl.robot
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+*** Settings ***
+Documentation S3 gateway test with aws cli
+Library OperatingSystem
+Library String
+Library ./presigned_url_helper.py
+Resource ../commonlib.robot
+Resource commonawslib.robot
+Test Timeout 5 minutes
+Suite Setup Setup s3 tests
+
+*** Variables ***
+${ENDPOINT_URL} http://s3g:9878
+${OZONE_TEST} true
+${BUCKET} generated
+
+*** Test Cases ***
+Presigned URL PUT Object
+ [Documentation] Test presigned URL PUT object
+ Execute echo "Randomtext" > /tmp/testfile
+ ${ACCESS_KEY} = Execute aws configure get aws_access_key_id
+ ${SECRET_ACCESS_KEY} = Execute aws configure get aws_secret_access_key
+ ${presigned_url}= Generate Presigned Put Object Url
${ACCESS_KEY} ${SECRET_ACCESS_KEY} ${BUCKET} test-presigned-put
us-east-1 3600 ${EMPTY} ${ENDPOINT_URL}
+ ${SHA256} = Compute Sha256 File /tmp/testfile
+ ${result} = Execute curl -X PUT -T "/tmp/testfile" -H
"x-amz-content-sha256: ${SHA256}" "${presigned_url}"
+ Should Not Contain ${result} Error
+ ${head_result} = Execute AWSS3ApiCli head-object --bucket
${BUCKET} --key test-presigned-put
+ Should Not Contain ${head_result} Error
+
+Presigned URL PUT Object using wrong x-amz-content-sha256
+ [Documentation] Test presigned URL PUT object with wrong
x-amz-content-sha256
+ Execute echo "Randomtext" > /tmp/testfile
+ ${ACCESS_KEY} = Execute aws configure get aws_access_key_id
+ ${SECRET_ACCESS_KEY} = Execute aws configure get aws_secret_access_key
+ ${presigned_url}= Generate Presigned Put Object Url
${ACCESS_KEY} ${SECRET_ACCESS_KEY} ${BUCKET}
test-presigned-put-wrong-sha us-east-1 3600 ${EMPTY} ${ENDPOINT_URL}
+ ${result} = Execute curl -X PUT -T "/tmp/testfile" -H
"x-amz-content-sha256: wronghash" "${presigned_url}"
+ Should Contain ${result} The provided 'x-amz-content-sha256'
header does not match the computed hash.
+ ${head_result} = Execute AWSS3APICli and ignore error
head-object --bucket ${BUCKET} --key test-presigned-put-wrong-sha
+ Should contain ${head_result} 404
+ Should contain ${head_result} Not Found
diff --git a/hadoop-ozone/integration-test-s3/pom.xml
b/hadoop-ozone/integration-test-s3/pom.xml
index 55864a27902..46b3cbc6513 100644
--- a/hadoop-ozone/integration-test-s3/pom.xml
+++ b/hadoop-ozone/integration-test-s3/pom.xml
@@ -66,6 +66,11 @@
<artifactId>hadoop-common</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.kerby</groupId>
+ <artifactId>kerby-util</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.ozone</groupId>
<artifactId>hdds-common</artifactId>
diff --git
a/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/AbstractS3SDKV1Tests.java
b/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/AbstractS3SDKV1Tests.java
index 5e200e1350a..c872d19f527 100644
---
a/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/AbstractS3SDKV1Tests.java
+++
b/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/AbstractS3SDKV1Tests.java
@@ -1152,6 +1152,41 @@ public void testPresignedUrlPutObject() throws Exception
{
}
}
+ @Test
+ public void testPresignedUrlPutSingleChunkWithWrongSha256() throws
Exception {
+ final String keyName = getKeyName();
+
+ // Test PutObjectRequest presigned URL
+ GeneratePresignedUrlRequest generatePresignedUrlRequest =
+ new GeneratePresignedUrlRequest(BUCKET_NAME,
keyName).withMethod(HttpMethod.PUT).withExpiration(expiration);
+ URL presignedUrl =
s3Client.generatePresignedUrl(generatePresignedUrlRequest);
+
+ Map<String, List<String>> headers = new HashMap<>();
+ List<String> sha256Value = new ArrayList<>();
+ sha256Value.add("wrong-sha256-value");
+ headers.put("x-amz-content-sha256", sha256Value);
+
+ HttpURLConnection connection = null;
+ try {
+ connection = S3SDKTestUtils.openHttpURLConnection(presignedUrl, "PUT",
+ headers, CONTENT.getBytes(StandardCharsets.UTF_8));
+ int responseCode = connection.getResponseCode();
+ assertEquals(400, responseCode, "PutObject presigned URL should return
400 because of wrong SHA256");
+ } finally {
+ if (connection != null) {
+ connection.disconnect();
+ }
+ }
+
+ // Verify the object was not uploaded
+ AmazonServiceException ase = assertThrows(AmazonServiceException.class,
+ () -> s3Client.getObject(BUCKET_NAME, keyName));
+
+ assertEquals(ErrorType.Client, ase.getErrorType());
+ assertEquals(404, ase.getStatusCode());
+ assertEquals("NoSuchKey", ase.getErrorCode());
+ }
+
@Test
public void testPresignedUrlMultipartUpload(@TempDir Path tempDir) throws
Exception {
final String keyName = getKeyName();
diff --git
a/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v2/AbstractS3SDKV2Tests.java
b/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v2/AbstractS3SDKV2Tests.java
index 09026dcb918..73dac51346d 100644
---
a/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v2/AbstractS3SDKV2Tests.java
+++
b/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v2/AbstractS3SDKV2Tests.java
@@ -67,6 +67,7 @@
import org.apache.hadoop.ozone.s3.S3ClientFactory;
import org.apache.hadoop.ozone.s3.awssdk.S3SDKTestUtils;
import org.apache.hadoop.ozone.s3.endpoint.S3Owner;
+import org.apache.hadoop.ozone.s3.util.S3Consts;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ozone.test.NonHATests;
import org.apache.ozone.test.OzoneTestBase;
@@ -645,6 +646,41 @@ public void testPresignedUrlPut() throws Exception {
assertEquals(CONTENT, actualContent);
}
+ @Test
+ public void testPresignedUrlPutSingleChunkWithWrongSha256() throws
Exception {
+ final String keyName = getKeyName();
+
+ PutObjectRequest objectRequest =
PutObjectRequest.builder().bucket(BUCKET_NAME).key(keyName).build();
+
+ PutObjectPresignRequest presignRequest =
PutObjectPresignRequest.builder()
+ .signatureDuration(duration)
+ .putObjectRequest(objectRequest)
+ .build();
+
+ PresignedPutObjectRequest presignedRequest =
presigner.presignPutObject(presignRequest);
+
+ Map<String, List<String>> headers = presignedRequest.signedHeaders();
+ List<String> sha256 = new ArrayList<>();
+ sha256.add("wrong-sha256-value");
+ headers.put(S3Consts.X_AMZ_CONTENT_SHA256, sha256);
+
+ // use http url connection
+ HttpURLConnection connection = null;
+ try {
+ connection =
S3SDKTestUtils.openHttpURLConnection(presignedRequest.url(), "PUT",
+ headers, CONTENT.getBytes(StandardCharsets.UTF_8));
+ int responseCode = connection.getResponseCode();
+ assertEquals(400, responseCode, "PutObject presigned URL should return
400 because of wrong SHA256");
+ } finally {
+ if (connection != null) {
+ connection.disconnect();
+ }
+ }
+
+ // Verify the object was not uploaded
+ assertThrows(NoSuchKeyException.class, () -> s3Client.headObject(b ->
b.bucket(BUCKET_NAME).key(keyName)));
+ }
+
@Test
public void testPresignedUrlMultipartUpload(@TempDir Path tempDir) throws
Exception {
final String keyName = getKeyName();
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/MultiDigestInputStream.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/MultiDigestInputStream.java
new file mode 100644
index 00000000000..587cbec0516
--- /dev/null
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/MultiDigestInputStream.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.s3;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * An InputStream that computes multiple message digests simultaneously
+ * as data is read from the underlying stream.
+ *
+ * <p>
+ * This class extends {@link FilterInputStream} and allows multiple digest
+ * algorithms (for example, MD5 or SHA-256) to be computed in a single pass
+ * over the data. This is more efficient than reading the stream multiple
+ * times when multiple digests are required.
+ * </p>
+ *
+ * <p>Important note about relationship to {@code DigestInputStream}:</p>
+ * <ul>
+ * <li>This class is conceptually similar to {@link
java.security.DigestInputStream}.
+ * Several methods (notably {@link #read()} , {@link #read(byte[], int,
int)} and
+ * {@link #on(boolean)}) follow the same behavior and semantics as in
+ * {@code DigestInputStream} and are documented here with that intent.
+ * </li>
+ * <li>Where method signatures differ from {@code DigestInputStream} (for
+ * example {@link #getMessageDigest(String)} which takes an algorithm
name
+ * and returns the corresponding digest), the difference is explicitly
+ * documented on the method itself.</li>
+ * </ul>
+ *
+ * <p>Example usage:</p>
+ * <pre>
+ * MessageDigest md5 = MessageDigest.getInstance("MD5");
+ * MessageDigest sha256 = MessageDigest.getInstance("SHA-256");
+ * MultiDigestInputStream mdis = new MultiDigestInputStream(inputStream, md5,
sha256);
+ * // Read from mdis (reads will update all registered digests while 'on' is
true)
+ * byte[] md5Hash = mdis.getMessageDigest("MD5").digest();
+ * byte[] sha256Hash = mdis.getMessageDigest("SHA-256").digest();
+ * </pre>
+ *
+ * <p>Notes:</p>
+ * <ul>
+ * <li>The constructor accepts one or more already-created {@link
MessageDigest}
+ * instances; the digests are kept and updated as data is read.</li>
+ * <li>Call {@link #on(boolean)} with {@code false} to temporarily disable
+ * digest updates (for example, to skip computing during certain reads),
+ * and {@code true} to re-enable. This behavior mirrors
+ * {@link java.security.DigestInputStream#on(boolean)}.</li>
+ * <li>{@link #getAllDigests()} returns a copy of the internal digest
map.</li>
+ * </ul>
+ *
+ * @see java.security.DigestInputStream
+ */
+public class MultiDigestInputStream extends FilterInputStream {
+
+ private final Map<String, MessageDigest> digests;
+ private boolean on = true;
+
+ /**
+ * Creates a MultiDigestInputStream with the specified digests.
+ *
+ * @param in the underlying input stream
+ * @param inputDigests the message digest instances to compute (may be
zero-length)
+ */
+ public MultiDigestInputStream(InputStream in, Collection<MessageDigest>
inputDigests) {
+ super(in);
+ this.digests = new HashMap<>();
+ for (MessageDigest digest : inputDigests) {
+ digests.put(digest.getAlgorithm(), digest);
+ }
+ }
+
+ /**
+ * Reads the next byte of data from the input stream. If a byte is read and
+ * digest updates are enabled (see {@link #on(boolean)}), the byte is
+ * supplied to all registered digests.
+ *
+ * @return the next byte of data, or -1 if the end of the stream is reached
+ * @throws IOException if an I/O error occurs
+ */
+ @Override
+ public int read() throws IOException {
+ int ch = in.read();
+ if (ch != -1) {
+ updateDigests((byte) ch);
+ }
+ return ch;
+ }
+
+ /**
+ * Reads up to {@code len} bytes of data into an array of bytes from the
+ * input stream. If bytes are read and digest updates are enabled, the
+ * read bytes are supplied to all registered digests.
+ *
+ * @param b the buffer into which the data is read
+ * @param off the start offset in array {@code b} at which the data is
written
+ * @param len the maximum number of bytes to read
+ * @return the total number of bytes read into the buffer, or -1 if there is
+ * no more data because the end of the stream has been reached
+ * @throws IOException if an I/O error occurs
+ */
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int bytesRead = in.read(b, off, len);
+ if (bytesRead > 0) {
+ updateDigests(b, off, bytesRead);
+ }
+ return bytesRead;
+ }
+
+ private void updateDigests(byte b) {
+ if (!on) {
+ return;
+ }
+ for (MessageDigest digest : digests.values()) {
+ digest.update(b);
+ }
+ }
+
+ private void updateDigests(byte[] b, int off, int len) {
+ if (!on) {
+ return;
+ }
+ for (MessageDigest digest : digests.values()) {
+ digest.update(b, off, len);
+ }
+ }
+
+ /**
+ * Gets the {@link MessageDigest} instance for the specified algorithm.
+ *
+ * <p>Note: {@code DigestInputStream#getMessageDigest()} returns
+ * the single digest instance associated with that stream. This class may
+ * manage multiple digests; therefore this method accepts an algorithm name
+ * and returns the corresponding {@link MessageDigest} or {@code null} if not
+ * registered.
+ *
+ * @param algorithm the digest algorithm name (for example, "MD5" or
"SHA-256")
+ * @return the MessageDigest instance for the specified algorithm,
+ * or {@code null} if the algorithm was not registered
+ * @see java.security.DigestInputStream#getMessageDigest()
+ */
+ public MessageDigest getMessageDigest(String algorithm) {
+ return digests.get(algorithm);
+ }
+
+ /**
+ * Returns a copy of the map of all digests being computed.
+ * Modifications to the returned map do not affect the stream's internal
state.
+ *
+ * @return a shallow copy of the digests map (algorithm name to
MessageDigest)
+ */
+ public Map<String, MessageDigest> getAllDigests() {
+ return new HashMap<>(digests);
+ }
+
+ /**
+ * Resets all message digests by calling {@link MessageDigest#reset()} on
each
+ * registered digest.
+ */
+ public void resetDigests() {
+ for (MessageDigest digest : digests.values()) {
+ digest.reset();
+ }
+ }
+
+ /**
+ * Enable or disable updating of the registered digests while reading.
+ *
+ * @param enabled true to turn the digest function on, false to turn it off
+ */
+ public void on(boolean enabled) {
+ this.on = enabled;
+ }
+
+ /**
+ * Associates the given MessageDigest with the specified algorithm name,
+ * replacing any existing digest for that algorithm.
+ *
+ * @param algorithm the digest algorithm name
+ * @param digest the MessageDigest instance to set
+ */
+ public void setMessageDigest(String algorithm, MessageDigest digest) {
+ digests.put(algorithm, digest);
+ }
+
+ /**
+ * Adds a new message digest algorithm to be computed. If the algorithm name
+ * already exists in the map, it will be replaced by the newly created
+ * MessageDigest instance.
+ *
+ * @param algorithm the digest algorithm name
+ * @throws NoSuchAlgorithmException if the algorithm is not available
+ */
+ public void addMessageDigest(String algorithm)
+ throws NoSuchAlgorithmException {
+ digests.put(algorithm, MessageDigest.getInstance(algorithm));
+ }
+
+ /**
+ * Removes and returns the message digest instance for the specified
+ * algorithm name.
+ *
+ * @param algorithm the digest algorithm name to remove
+ * @return the removed MessageDigest, or {@code null} if not found
+ */
+ public MessageDigest removeMessageDigest(String algorithm) {
+ return digests.remove(algorithm);
+ }
+
+ /**
+ * Returns a string representation of this stream and its message digests.
+ *
+ * @return a string representation of the object
+ */
+ @Override
+ public String toString() {
+ return getClass().getName() + " [on=" + on + ", algorithms="
+ + digests.keySet() + "]";
+ }
+}
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index c6a2b653909..45e20230337 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -74,6 +74,8 @@
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -124,6 +126,7 @@
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.hadoop.ozone.s3.HeaderPreprocessor;
+import org.apache.hadoop.ozone.s3.MultiDigestInputStream;
import org.apache.hadoop.ozone.s3.SignedChunksInputStream;
import org.apache.hadoop.ozone.s3.UnsignedChunksInputStream;
import org.apache.hadoop.ozone.s3.endpoint.S3Tagging.Tag;
@@ -139,6 +142,7 @@
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.util.Time;
import org.apache.http.HttpStatus;
+import org.apache.ratis.util.function.CheckedRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -155,6 +159,7 @@ public class ObjectEndpoint extends EndpointBase {
LoggerFactory.getLogger(ObjectEndpoint.class);
private static final ThreadLocal<MessageDigest> E_TAG_PROVIDER;
+ private static final ThreadLocal<MessageDigest> SHA_256_PROVIDER;
static {
E_TAG_PROVIDER = ThreadLocal.withInitial(() -> {
@@ -164,11 +169,19 @@ public class ObjectEndpoint extends EndpointBase {
throw new RuntimeException(e);
}
});
+
+ SHA_256_PROVIDER = ThreadLocal.withInitial(() -> {
+ try {
+ return MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException(e);
+ }
+ });
}
/*FOR the feature Overriding Response Header
https://docs.aws.amazon.com/de_de/AmazonS3/latest/API/API_GetObject.html */
- private Map<String, String> overrideQueryParameter;
+ private final Map<String, String> overrideQueryParameter;
private int bufferSize;
private int chunkSize;
private boolean datastreamEnabled;
@@ -226,7 +239,7 @@ public Response put(
PerformanceStringBuilder perf = new PerformanceStringBuilder();
String copyHeader = null, storageType = null, storageConfig = null;
- DigestInputStream digestInputStream = null;
+ MultiDigestInputStream multiDigestInputStream = null;
try {
if (aclMarker != null) {
s3GAction = S3GAction.PUT_OBJECT_ACL;
@@ -302,7 +315,7 @@ public Response put(
// Normal put object
S3ChunkInputStreamInfo chunkInputStreamInfo =
getS3ChunkInputStreamInfo(body,
length, amzDecodedLength, keyPath);
- digestInputStream = chunkInputStreamInfo.getDigestInputStream();
+ multiDigestInputStream =
chunkInputStreamInfo.getMultiDigestInputStream();
length = chunkInputStreamInfo.getEffectiveLength();
Map<String, String> customMetadata =
@@ -315,22 +328,37 @@ public Response put(
perf.appendStreamMode();
Pair<String, Long> keyWriteResult = ObjectEndpointStreaming
.put(bucket, keyPath, length, replicationConfig, chunkSize,
- customMetadata, tags, digestInputStream, perf);
+ customMetadata, tags, multiDigestInputStream, getHeaders(),
signatureInfo.isSignPayload(), perf);
eTag = keyWriteResult.getKey();
putLength = keyWriteResult.getValue();
} else {
+ final String amzContentSha256Header =
+ validateSignatureHeader(getHeaders(), keyPath,
signatureInfo.isSignPayload());
try (OzoneOutputStream output = getClientProtocol().createKey(
volume.getName(), bucketName, keyPath, length, replicationConfig,
customMetadata, tags)) {
long metadataLatencyNs =
getMetrics().updatePutKeyMetadataStats(startNanos);
perf.appendMetaLatencyNanos(metadataLatencyNs);
- putLength = IOUtils.copyLarge(digestInputStream, output, 0, length,
+ putLength = IOUtils.copyLarge(multiDigestInputStream, output, 0,
length,
new byte[getIOBufferSize(length)]);
eTag = DatatypeConverter.printHexBinary(
- digestInputStream.getMessageDigest().digest())
+
multiDigestInputStream.getMessageDigest(OzoneConsts.MD5_HASH).digest())
.toLowerCase();
output.getMetadata().put(OzoneConsts.ETAG, eTag);
+
+ // If sha256Digest exists, this request must validate
x-amz-content-sha256
+ MessageDigest sha256Digest =
multiDigestInputStream.getMessageDigest(OzoneConsts.FILE_HASH);
+ if (sha256Digest != null) {
+ final String actualSha256 = DatatypeConverter.printHexBinary(
+ sha256Digest.digest()).toLowerCase();
+ CheckedRunnable<IOException> preCommit = () -> {
+ if (!amzContentSha256Header.equals(actualSha256)) {
+ throw
S3ErrorTable.newError(S3ErrorTable.X_AMZ_CONTENT_SHA256_MISMATCH, keyPath);
+ }
+ };
+
output.getKeyOutputStream().setPreCommits(Collections.singletonList(preCommit));
+ }
}
}
getMetrics().incPutKeySuccessLength(putLength);
@@ -383,8 +411,8 @@ public Response put(
} finally {
// Reset the thread-local message digest instance in case of exception
// and MessageDigest#digest is never called
- if (digestInputStream != null) {
- digestInputStream.getMessageDigest().reset();
+ if (multiDigestInputStream != null) {
+ multiDigestInputStream.resetDigests();
}
if (auditSuccess) {
long opLatencyNs =
getMetrics().updateCreateKeySuccessStats(startNanos);
@@ -941,13 +969,13 @@ private Response createMultipartKey(OzoneVolume volume,
OzoneBucket ozoneBucket,
throws IOException, OS3Exception {
long startNanos = Time.monotonicNowNanos();
String copyHeader = null;
- DigestInputStream digestInputStream = null;
+ MultiDigestInputStream multiDigestInputStream = null;
final String bucketName = ozoneBucket.getName();
try {
String amzDecodedLength =
getHeaders().getHeaderString(DECODED_CONTENT_LENGTH_HEADER);
S3ChunkInputStreamInfo chunkInputStreamInfo = getS3ChunkInputStreamInfo(
body, length, amzDecodedLength, key);
- digestInputStream = chunkInputStreamInfo.getDigestInputStream();
+ multiDigestInputStream =
chunkInputStreamInfo.getMultiDigestInputStream();
length = chunkInputStreamInfo.getEffectiveLength();
copyHeader = getHeaders().getHeaderString(COPY_SOURCE_HEADER);
@@ -967,7 +995,7 @@ private Response createMultipartKey(OzoneVolume volume,
OzoneBucket ozoneBucket,
perf.appendStreamMode();
return ObjectEndpointStreaming
.createMultipartKey(ozoneBucket, key, length, partNumber,
- uploadID, chunkSize, digestInputStream, perf);
+ uploadID, chunkSize, multiDigestInputStream, perf);
}
// OmMultipartCommitUploadPartInfo can only be gotten after the
// OzoneOutputStream is closed, so we need to save the OzoneOutputStream
@@ -1044,9 +1072,9 @@ private Response createMultipartKey(OzoneVolume volume,
OzoneBucket ozoneBucket,
partNumber, uploadID)) {
metadataLatencyNs =
getMetrics().updatePutKeyMetadataStats(startNanos);
- putLength = IOUtils.copyLarge(digestInputStream, ozoneOutputStream,
0, length,
+ putLength = IOUtils.copyLarge(multiDigestInputStream,
ozoneOutputStream, 0, length,
new byte[getIOBufferSize(length)]);
- byte[] digest = digestInputStream.getMessageDigest().digest();
+ byte[] digest =
multiDigestInputStream.getMessageDigest(OzoneConsts.MD5_HASH).digest();
ozoneOutputStream.getMetadata()
.put(OzoneConsts.ETAG,
DatatypeConverter.printHexBinary(digest).toLowerCase());
outputStream = ozoneOutputStream;
@@ -1095,8 +1123,8 @@ private Response createMultipartKey(OzoneVolume volume,
OzoneBucket ozoneBucket,
} finally {
// Reset the thread-local message digest instance in case of exception
// and MessageDigest#digest is never called
- if (digestInputStream != null) {
- digestInputStream.getMessageDigest().reset();
+ if (multiDigestInputStream != null) {
+ multiDigestInputStream.resetDigests();
}
}
}
@@ -1475,6 +1503,11 @@ public MessageDigest getMessageDigestInstance() {
return E_TAG_PROVIDER.get();
}
+ @VisibleForTesting
+ public MessageDigest getSha256DigestInstance() {
+ return SHA_256_PROVIDER.get();
+ }
+
private String extractPartsCount(String eTag) {
if (eTag.contains("-")) {
String[] parts = eTag.replace("\"", "").split("-");
@@ -1524,23 +1557,30 @@ private S3ChunkInputStreamInfo
getS3ChunkInputStreamInfo(
effectiveLength = contentLength;
}
- // DigestInputStream is used for ETag calculation
- DigestInputStream digestInputStream = new
DigestInputStream(chunkInputStream, getMessageDigestInstance());
- return new S3ChunkInputStreamInfo(digestInputStream, effectiveLength);
+ // MessageDigest is used for ETag calculation
+ // and Sha256Digest is used for "x-amz-content-sha256" header verification
+ List<MessageDigest> digests = new ArrayList<>();
+ digests.add(getMessageDigestInstance());
+ if (!hasUnsignedPayload(amzContentSha256Header) &&
!hasMultiChunksPayload(amzContentSha256Header)) {
+ digests.add(getSha256DigestInstance());
+ }
+ MultiDigestInputStream multiDigestInputStream =
+ new MultiDigestInputStream(chunkInputStream, digests);
+ return new S3ChunkInputStreamInfo(multiDigestInputStream, effectiveLength);
}
@Immutable
static final class S3ChunkInputStreamInfo {
- private final DigestInputStream digestInputStream;
+ private final MultiDigestInputStream multiDigestInputStream;
private final long effectiveLength;
- S3ChunkInputStreamInfo(DigestInputStream digestInputStream, long
effectiveLength) {
- this.digestInputStream = digestInputStream;
+ S3ChunkInputStreamInfo(MultiDigestInputStream multiDigestInputStream, long
effectiveLength) {
+ this.multiDigestInputStream = multiDigestInputStream;
this.effectiveLength = effectiveLength;
}
- public DigestInputStream getDigestInputStream() {
- return digestInputStream;
+ public MultiDigestInputStream getMultiDigestInputStream() {
+ return multiDigestInputStream;
}
public long getEffectiveLength() {
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
index 186719c2b78..8773bf3ca68 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
@@ -20,12 +20,15 @@
import static
org.apache.hadoop.ozone.audit.AuditLogger.PerformanceStringBuilder;
import static
org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_REQUEST;
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.NO_SUCH_UPLOAD;
+import static org.apache.hadoop.ozone.s3.util.S3Utils.validateSignatureHeader;
import static org.apache.hadoop.ozone.s3.util.S3Utils.wrapInQuotes;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.security.DigestInputStream;
+import java.security.MessageDigest;
+import java.util.Collections;
import java.util.Map;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
@@ -38,10 +41,12 @@
import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.om.OmConfig;
import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.s3.MultiDigestInputStream;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
import org.apache.hadoop.ozone.s3.metrics.S3GatewayMetrics;
import org.apache.hadoop.util.Time;
+import org.apache.ratis.util.function.CheckedRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,13 +67,14 @@ public static Pair<String, Long> put(
OzoneBucket bucket, String keyPath,
long length, ReplicationConfig replicationConfig,
int chunkSize, Map<String, String> keyMetadata,
- Map<String, String> tags,
- DigestInputStream body, PerformanceStringBuilder perf)
+ Map<String, String> tags, MultiDigestInputStream body,
+ HttpHeaders headers, boolean isSignedPayload,
+ PerformanceStringBuilder perf)
throws IOException, OS3Exception {
try {
return putKeyWithStream(bucket, keyPath,
- length, chunkSize, replicationConfig, keyMetadata, tags, body, perf);
+ length, chunkSize, replicationConfig, keyMetadata, tags, body,
headers, isSignedPayload, perf);
} catch (IOException ex) {
LOG.error("Exception occurred in PutObject", ex);
if (ex instanceof OMException) {
@@ -100,19 +106,36 @@ public static Pair<String, Long> putKeyWithStream(
ReplicationConfig replicationConfig,
Map<String, String> keyMetadata,
Map<String, String> tags,
- DigestInputStream body, PerformanceStringBuilder perf)
- throws IOException {
+ MultiDigestInputStream body,
+ HttpHeaders headers,
+ boolean isSignedPayload,
+ PerformanceStringBuilder perf)
+ throws IOException, OS3Exception {
long startNanos = Time.monotonicNowNanos();
+ final String amzContentSha256Header = validateSignatureHeader(headers,
keyPath, isSignedPayload);
long writeLen;
String eTag;
try (OzoneDataStreamOutput streamOutput = bucket.createStreamKey(keyPath,
length, replicationConfig, keyMetadata, tags)) {
long metadataLatencyNs = METRICS.updatePutKeyMetadataStats(startNanos);
writeLen = writeToStreamOutput(streamOutput, body, bufferSize, length);
- eTag = DatatypeConverter.printHexBinary(body.getMessageDigest().digest())
+ eTag =
DatatypeConverter.printHexBinary(body.getMessageDigest(OzoneConsts.MD5_HASH).digest())
.toLowerCase();
perf.appendMetaLatencyNanos(metadataLatencyNs);
((KeyMetadataAware)streamOutput).getMetadata().put(OzoneConsts.ETAG,
eTag);
+
+ // If sha256Digest exists, this request must validate
x-amz-content-sha256
+ MessageDigest sha256Digest =
body.getMessageDigest(OzoneConsts.FILE_HASH);
+ if (sha256Digest != null) {
+ final String actualSha256 = DatatypeConverter.printHexBinary(
+ sha256Digest.digest()).toLowerCase();
+ CheckedRunnable<IOException> preCommit = () -> {
+ if (!amzContentSha256Header.equals(actualSha256)) {
+ throw
S3ErrorTable.newError(S3ErrorTable.X_AMZ_CONTENT_SHA256_MISMATCH, keyPath);
+ }
+ };
+
streamOutput.getKeyDataStreamOutput().setPreCommits(Collections.singletonList(preCommit));
+ }
}
return Pair.of(eTag, writeLen);
}
@@ -163,7 +186,7 @@ private static long
writeToStreamOutput(OzoneDataStreamOutput streamOutput,
@SuppressWarnings("checkstyle:ParameterNumber")
public static Response createMultipartKey(OzoneBucket ozoneBucket, String
key,
long length, int partNumber, String uploadID, int chunkSize,
- DigestInputStream body, PerformanceStringBuilder perf)
+ MultiDigestInputStream body, PerformanceStringBuilder perf)
throws IOException, OS3Exception {
long startNanos = Time.monotonicNowNanos();
String eTag;
@@ -174,7 +197,7 @@ public static Response createMultipartKey(OzoneBucket
ozoneBucket, String key,
long putLength =
writeToStreamOutput(streamOutput, body, chunkSize, length);
eTag = DatatypeConverter.printHexBinary(
- body.getMessageDigest().digest()).toLowerCase();
+
body.getMessageDigest(OzoneConsts.MD5_HASH).digest()).toLowerCase();
((KeyMetadataAware)streamOutput).getMetadata().put(OzoneConsts.ETAG,
eTag);
METRICS.incPutKeySuccessLength(putLength);
perf.appendMetaLatencyNanos(metadataLatencyNs);
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/OS3Exception.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/OS3Exception.java
index 00b36427d43..f93f4a7a4d7 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/OS3Exception.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/OS3Exception.java
@@ -36,7 +36,7 @@
*/
@XmlRootElement(name = "Error")
@XmlAccessorType(XmlAccessType.NONE)
-public class OS3Exception extends Exception {
+public class OS3Exception extends RuntimeException {
private static final Logger LOG =
LoggerFactory.getLogger(OS3Exception.class);
private static ObjectMapper mapper;
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/S3ErrorTable.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/S3ErrorTable.java
index 060ed83d1bc..434087da746 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/S3ErrorTable.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/S3ErrorTable.java
@@ -160,6 +160,10 @@ public final class S3ErrorTable {
"Access Denied", "User doesn't have permission to access this resource
due to a " +
"bucket ownership mismatch.", HTTP_FORBIDDEN);
+ public static final OS3Exception X_AMZ_CONTENT_SHA256_MISMATCH = new
OS3Exception(
+ "XAmzContentSHA256Mismatch", "The provided 'x-amz-content-sha256' header
does " +
+ "not match the computed hash.", HTTP_BAD_REQUEST);
+
private static Function<Exception, OS3Exception> generateInternalError =
e -> new OS3Exception("InternalError", e.getMessage(),
HTTP_INTERNAL_ERROR);
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/signature/StringToSignProducer.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/signature/StringToSignProducer.java
index e2f8d64a4d1..233a001400e 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/signature/StringToSignProducer.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/signature/StringToSignProducer.java
@@ -337,7 +337,7 @@ static void validateSignedHeader(
}
break;
case X_AMZ_CONTENT_SHA256:
- // TODO: Construct request payload and match HEX(SHA256(requestPayload))
+ // Validate x-amz-content-sha256 during upload, before committing the
key.
break;
default:
break;
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/TestMultiDigestInputStream.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/TestMultiDigestInputStream.java
new file mode 100644
index 00000000000..cd83d5d4900
--- /dev/null
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/TestMultiDigestInputStream.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.s3;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.security.MessageDigest;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.commons.io.IOUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Test {@link MultiDigestInputStream}.
+ */
+public class TestMultiDigestInputStream {
+
+ private static final String TEST_DATA = "1234567890";
+
+ static Stream<Arguments> algorithmAndDataTestCases() throws Exception {
+ return Stream.of(
+ // Empty stream
+ Arguments.of("empty stream with MD5",
+ Arrays.asList(MessageDigest.getInstance("MD5")), ""),
+ Arguments.of("empty stream with multiple algorithms",
+ Arrays.asList(MessageDigest.getInstance("MD5"),
+ MessageDigest.getInstance("SHA-256")), ""),
+ // Normal data
+ Arguments.of("MD5",
+ Arrays.asList(MessageDigest.getInstance("MD5")), TEST_DATA),
+ Arguments.of("MD5 and SHA-256",
+ Arrays.asList(MessageDigest.getInstance("MD5"),
+ MessageDigest.getInstance("SHA-256")), TEST_DATA),
+ Arguments.of("MD5, SHA-1 and SHA-256",
+ Arrays.asList(MessageDigest.getInstance("MD5"),
+ MessageDigest.getInstance("SHA-1"),
+ MessageDigest.getInstance("SHA-256")), TEST_DATA)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("algorithmAndDataTestCases")
+ void testRead(String testName, List<MessageDigest> digests, String data)
throws Exception {
+ byte[] dataBytes = data.getBytes(UTF_8);
+
+ try (MultiDigestInputStream mdis = new MultiDigestInputStream(
+ new ByteArrayInputStream(dataBytes), digests)) {
+ String result = IOUtils.toString(mdis, UTF_8);
+ assertEquals(data, result);
+
+ for (MessageDigest digest : digests) {
+ String algorithm = digest.getAlgorithm();
+ byte[] expectedDigest =
MessageDigest.getInstance(algorithm).digest(dataBytes);
+ assertArrayEquals(expectedDigest,
mdis.getMessageDigest(algorithm).digest());
+ }
+ }
+ }
+
+ @Test
+ void testOnOffFunctionality() throws Exception {
+ byte[] data = TEST_DATA.getBytes(UTF_8);
+
+ try (MultiDigestInputStream mdis = new MultiDigestInputStream(new
ByteArrayInputStream(data),
+ Collections.singletonList(MessageDigest.getInstance("MD5")))) {
+
+ mdis.on(false);
+
+ String result = IOUtils.toString(mdis, UTF_8);
+ assertEquals(TEST_DATA, result);
+
+ // Digest should be empty since it was turned off
+ MessageDigest md5 = mdis.getMessageDigest("MD5");
+ assertNotNull(md5);
+ byte[] emptyDigest = MessageDigest.getInstance("MD5").digest();
+ assertArrayEquals(emptyDigest, md5.digest());
+ }
+ }
+
+ @Test
+ void testOnOffWithPartialRead() throws Exception {
+ String firstPart = "12345";
+ String secondPart = "67890";
+ byte[] data = (firstPart + secondPart).getBytes(UTF_8);
+
+ try (MultiDigestInputStream mdis = new MultiDigestInputStream(new
ByteArrayInputStream(data),
+ Collections.singletonList(MessageDigest.getInstance("MD5")))) {
+ // Read first part with digest on
+ byte[] buffer1 = new byte[firstPart.length()];
+ int bytesRead1 = mdis.read(buffer1, 0, buffer1.length);
+ assertEquals(firstPart.length(), bytesRead1);
+ assertEquals(firstPart, new String(buffer1, UTF_8));
+
+ mdis.on(false);
+ byte[] buffer2 = new byte[secondPart.length()];
+ int bytesRead2 = mdis.read(buffer2, 0, buffer2.length);
+ assertEquals(secondPart.length(), bytesRead2);
+ assertEquals(secondPart, new String(buffer2, UTF_8));
+
+ // Digest should only contain first part
+ MessageDigest md5 = mdis.getMessageDigest("MD5");
+ byte[] expectedDigest =
MessageDigest.getInstance("MD5").digest(firstPart.getBytes(UTF_8));
+ assertArrayEquals(expectedDigest, md5.digest());
+ }
+ }
+
+ @Test
+ void testResetDigests() throws Exception {
+ byte[] data = TEST_DATA.getBytes(UTF_8);
+
+ try (MultiDigestInputStream mdis = new MultiDigestInputStream(new
ByteArrayInputStream(data),
+ Collections.singletonList(MessageDigest.getInstance("MD5")))) {
+
+ int byte1 = mdis.read();
+ int byte2 = mdis.read();
+ assertTrue(byte1 != -1 && byte2 != -1);
+
+ mdis.resetDigests();
+
+ MessageDigest md5 = mdis.getMessageDigest("MD5");
+ byte[] emptyDigest = MessageDigest.getInstance("MD5").digest();
+ assertArrayEquals(emptyDigest, md5.digest());
+ }
+ }
+
+ @Test
+ void testDigestManagement() throws Exception {
+ byte[] data = TEST_DATA.getBytes(UTF_8);
+
+ try (MultiDigestInputStream mdis = new MultiDigestInputStream(new
ByteArrayInputStream(data),
+ Arrays.asList(MessageDigest.getInstance("MD5"),
MessageDigest.getInstance("SHA-1")))) {
+
+ // Test initial state - getAllDigests
+ Map<String, MessageDigest> allDigests = mdis.getAllDigests();
+ assertEquals(2, allDigests.size());
+ assertTrue(allDigests.containsKey("MD5"));
+ assertTrue(allDigests.containsKey("SHA-1"));
+
+ // Test add
+ mdis.addMessageDigest("SHA-256");
+ assertNotNull(mdis.getMessageDigest("SHA-256"));
+ assertEquals(3, mdis.getAllDigests().size());
+
+ // Test set - replace with new instance
+ MessageDigest newMd5 = MessageDigest.getInstance("MD5");
+ mdis.setMessageDigest("MD5", newMd5);
+ assertNotNull(mdis.getMessageDigest("MD5"));
+
+ // Test remove
+ MessageDigest removed = mdis.removeMessageDigest("SHA-1");
+ assertNotNull(removed);
+ assertNull(mdis.getMessageDigest("SHA-1"));
+ assertEquals(2, mdis.getAllDigests().size());
+
+ // Test get non-existent
+ assertNull(mdis.getMessageDigest("SHA-512"));
+
+ // Read data and verify remaining digests work correctly
+ String result = IOUtils.toString(mdis, UTF_8);
+ assertEquals(TEST_DATA, result);
+
+ byte[] expectedMd5 = MessageDigest.getInstance("MD5").digest(data);
+ assertArrayEquals(expectedMd5, mdis.getMessageDigest("MD5").digest());
+
+ byte[] expectedSha256 =
MessageDigest.getInstance("SHA-256").digest(data);
+ assertArrayEquals(expectedSha256,
mdis.getMessageDigest("SHA-256").digest());
+ }
+ }
+
+}
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectGet.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectGet.java
index a9fd7da4200..7c1352e59dd 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectGet.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectGet.java
@@ -81,7 +81,7 @@ public void init() throws OS3Exception, IOException {
client.getObjectStore().createS3Bucket(BUCKET_NAME);
headers = mock(HttpHeaders.class);
-
when(headers.getHeaderString(X_AMZ_CONTENT_SHA256)).thenReturn("mockSignature");
+
when(headers.getHeaderString(X_AMZ_CONTENT_SHA256)).thenReturn("UNSIGNED-PAYLOAD");
rest = EndpointBuilder.newObjectEndpointBuilder()
.setClient(client)
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
index a561343a518..ed6afb29a45 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
@@ -506,7 +506,7 @@ public void testPutEmptyObject() throws Exception {
private HttpHeaders newMockHttpHeaders() {
HttpHeaders httpHeaders = mock(HttpHeaders.class);
-
when(httpHeaders.getHeaderString(X_AMZ_CONTENT_SHA256)).thenReturn("mockSignature");
+
when(httpHeaders.getHeaderString(X_AMZ_CONTENT_SHA256)).thenReturn("UNSIGNED-PAYLOAD");
return httpHeaders;
}
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectTaggingDelete.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectTaggingDelete.java
index 6cf7eea1336..d60752dfedd 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectTaggingDelete.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectTaggingDelete.java
@@ -74,7 +74,7 @@ public void init() throws OS3Exception, IOException {
// Create a key with object tags
Mockito.when(headers.getHeaderString(TAG_HEADER)).thenReturn("tag1=value1&tag2=value2");
Mockito.when(headers.getHeaderString(X_AMZ_CONTENT_SHA256))
- .thenReturn("mockSignature");
+ .thenReturn("UNSIGNED-PAYLOAD");
put(rest, BUCKET_NAME, KEY_WITH_TAG, CONTENT);
}
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectTaggingGet.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectTaggingGet.java
index 94942479cff..f1e166a138d 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectTaggingGet.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectTaggingGet.java
@@ -59,7 +59,7 @@ public void init() throws Exception {
HttpHeaders headers = Mockito.mock(HttpHeaders.class);
Mockito.when(headers.getHeaderString(X_AMZ_CONTENT_SHA256))
- .thenReturn("mockSignature");
+ .thenReturn("UNSIGNED-PAYLOAD");
rest = EndpointBuilder.newObjectEndpointBuilder()
.setClient(client)
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectTaggingPut.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectTaggingPut.java
index f6f26515ea9..75ddd97bd24 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectTaggingPut.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectTaggingPut.java
@@ -65,7 +65,7 @@ void setup() throws Exception {
clientStub.getObjectStore().createS3Bucket(BUCKET_NAME);
HttpHeaders headers = mock(HttpHeaders.class);
-
when(headers.getHeaderString(X_AMZ_CONTENT_SHA256)).thenReturn("mockSignature");
+
when(headers.getHeaderString(X_AMZ_CONTENT_SHA256)).thenReturn("UNSIGNED-PAYLOAD");
// Create PutObject and setClient to OzoneClientStub
objectEndpoint = EndpointBuilder.newObjectEndpointBuilder()
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
index 4981069528a..2da2a42a05b 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
@@ -233,9 +233,13 @@ public void
testPartUploadMessageDigestResetDuringException() throws IOException
assertEquals(200, response.getStatus());
MessageDigest messageDigest = mock(MessageDigest.class);
+ when(messageDigest.getAlgorithm()).thenReturn("MD5");
+ MessageDigest sha256Digest = mock(MessageDigest.class);
+ when(sha256Digest.getAlgorithm()).thenReturn("SHA-256");
try (MockedStatic<IOUtils> mocked = mockStatic(IOUtils.class)) {
// Add the mocked methods only during the copy request
when(objectEndpoint.getMessageDigestInstance()).thenReturn(messageDigest);
+ when(objectEndpoint.getSha256DigestInstance()).thenReturn(sha256Digest);
mocked.when(() -> IOUtils.copyLarge(any(InputStream.class),
any(OutputStream.class), anyLong(),
anyLong(), any(byte[].class)))
.thenThrow(IOException.class);
@@ -251,6 +255,7 @@ public void
testPartUploadMessageDigestResetDuringException() throws IOException
// Verify that the message digest is reset so that the instance can be
reused for the
// next request in the same thread
verify(messageDigest, times(1)).reset();
+ verify(sha256Digest, times(1)).reset();
}
}
}
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestUploadWithStream.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestUploadWithStream.java
index dbe21601dbd..e9d70f67982 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestUploadWithStream.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestUploadWithStream.java
@@ -66,7 +66,7 @@ public void setUp() throws Exception {
client.getObjectStore().createS3Bucket(S3BUCKET);
HttpHeaders headers = mock(HttpHeaders.class);
-
when(headers.getHeaderString(X_AMZ_CONTENT_SHA256)).thenReturn("mockSignature");
+
when(headers.getHeaderString(X_AMZ_CONTENT_SHA256)).thenReturn("UNSIGNED-PAYLOAD");
when(headers.getHeaderString(STORAGE_CLASS_HEADER)).thenReturn("STANDARD");
OzoneConfiguration conf = new OzoneConfiguration();
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/metrics/TestS3GatewayMetrics.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/metrics/TestS3GatewayMetrics.java
index c6c9face137..8aa4ba707ef 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/metrics/TestS3GatewayMetrics.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/metrics/TestS3GatewayMetrics.java
@@ -90,7 +90,8 @@ public void setup() throws Exception {
when(headers.getHeaderString(STORAGE_CLASS_HEADER)).thenReturn(
"STANDARD");
when(headers.getHeaderString(X_AMZ_CONTENT_SHA256))
- .thenReturn("mockSignature");
+ .thenReturn("UNSIGNED-PAYLOAD");
+
bucketEndpoint = EndpointBuilder.newBucketEndpointBuilder()
.setClient(clientStub)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]