This is an automated email from the ASF dual-hosted git repository.

ivandika3 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e2bdd9b9cf HDDS-15193. Move the "atomic key creation" logic from 
output stream to S3 endpoints (#10202)
8e2bdd9b9cf is described below

commit 8e2bdd9b9cf946581c8ea5ba8720ed19ddbeff39
Author: Peter Lee <[email protected]>
AuthorDate: Sat May 23 15:07:02 2026 +0800

    HDDS-15193. Move the "atomic key creation" logic from output stream to S3 
endpoints (#10202)
---
 .../hadoop/ozone/client/io/ECKeyOutputStream.java  | 15 ----
 .../ozone/client/io/KeyDataStreamOutput.java       | 28 +------
 .../hadoop/ozone/client/io/KeyOutputStream.java    | 24 ------
 .../hadoop/ozone/client/io/OzoneOutputStream.java  | 26 +++++-
 .../ozone/client/protocol/ClientProtocol.java      |  2 -
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  | 20 +----
 .../hadoop/ozone/client/TestOzoneClient.java       | 34 --------
 .../ozone/client/io/TestOzoneOutputStream.java     | 45 +++++++++-
 .../hadoop/ozone/s3/endpoint/EndpointBase.java     | 16 +++-
 .../hadoop/ozone/s3/endpoint/ObjectEndpoint.java   | 63 +++++++-------
 .../ozone/s3/endpoint/ObjectEndpointStreaming.java | 67 +++++----------
 .../s3/endpoint/S3ObjectStreamingWriteGuard.java   | 55 +++++++++++++
 .../ozone/s3/endpoint/S3ObjectWriteGuard.java      | 96 ++++++++++++++++++++++
 .../hadoop/ozone/client/ClientProtocolStub.java    |  5 --
 .../hadoop/ozone/client/OzoneBucketStub.java       | 17 ++--
 .../ozone/s3/endpoint/EndpointTestUtils.java       | 57 ++++++++++++-
 .../hadoop/ozone/s3/endpoint/TestObjectPut.java    | 53 ++++++++----
 .../hadoop/ozone/s3/endpoint/TestPartUpload.java   | 95 +++++++++++----------
 .../ozone/s3/endpoint/TestUploadWithStream.java    | 30 +++++++
 19 files changed, 463 insertions(+), 285 deletions(-)

diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index ee5c7548757..9f94384d4df 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -74,14 +74,6 @@ public final class ECKeyOutputStream extends KeyOutputStream
   private final Future<Boolean> flushFuture;
   private final AtomicLong flushCheckpoint;
 
-  /**
-   * Indicates if an atomic write is required. When set to true,
-   * the amount of data written must match the declared size during the commit.
-   * A mismatch will prevent the commit from succeeding.
-   * This is essential for operations like S3 put to ensure atomicity.
-   */
-  private boolean atomicKeyCreation;
-
   private volatile boolean closed;
   private volatile boolean closing;
   // how much of data is actually written yet to underlying stream
@@ -130,7 +122,6 @@ private ECKeyOutputStream(Builder builder) {
       return flushStripeFromQueue();
     });
     this.flushCheckpoint = new AtomicLong(0);
-    this.atomicKeyCreation = builder.getAtomicKeyCreation();
   }
 
   @Override
@@ -489,12 +480,6 @@ public void close() throws IOException {
         Preconditions.checkArgument(writeOffset == offset,
             "Expected writeOffset= " + writeOffset
                 + " Expected offset=" + offset);
-        if (atomicKeyCreation) {
-          long expectedSize = blockOutputStreamEntryPool.getDataSize();
-          Preconditions.checkState(expectedSize == offset, String.format(
-              "Expected: %d and actual %d write sizes do not match",
-                  expectedSize, offset));
-        }
         for (CheckedRunnable<IOException> preCommit : preCommits) {
           preCommit.run();
         }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
index 7556f1e6d76..119af2f04e5 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
@@ -77,14 +77,6 @@ public class KeyDataStreamOutput extends 
AbstractDataStreamOutput
 
   private long clientID;
 
-  /**
-   * Indicates if an atomic write is required. When set to true,
-   * the amount of data written must match the declared size during the commit.
-   * A mismatch will prevent the commit from succeeding.
-   * This is essential for operations like S3 put to ensure atomicity.
-   */
-  private boolean atomicKeyCreation;
-
   private List<CheckedRunnable<IOException>> preCommits = 
Collections.emptyList();
 
   @Override
@@ -130,7 +122,6 @@ public KeyDataStreamOutput() {
 
     this.writeOffset = 0;
     this.clientID = 0L;
-    this.atomicKeyCreation = false;
   }
 
   @SuppressWarnings({"parameternumber", "squid:S00107"})
@@ -141,8 +132,7 @@ public KeyDataStreamOutput(
       OzoneManagerProtocol omClient, int chunkSize,
       String requestId, ReplicationConfig replicationConfig,
       String uploadID, int partNumber, boolean isMultipart,
-      boolean unsafeByteBufferConversion,
-      boolean atomicKeyCreation
+      boolean unsafeByteBufferConversion
   ) {
     super(HddsClientUtils.getRetryPolicyByException(
         config.getMaxRetryCount(), config.getRetryInterval()));
@@ -163,7 +153,6 @@ public KeyDataStreamOutput(
     // encrypted bucket.
     this.writeOffset = 0;
     this.clientID = handler.getId();
-    this.atomicKeyCreation = atomicKeyCreation;
   }
 
   /**
@@ -458,12 +447,6 @@ public void close() throws IOException {
       if (!isException()) {
         Preconditions.checkArgument(writeOffset == offset);
       }
-      if (atomicKeyCreation) {
-        long expectedSize = blockDataStreamOutputEntryPool.getDataSize();
-        Preconditions.checkArgument(expectedSize == offset,
-            String.format("Expected: %d and actual %d write sizes do not 
match",
-                expectedSize, offset));
-      }
       for (CheckedRunnable<IOException> preCommit : preCommits) {
         preCommit.run();
       }
@@ -503,7 +486,6 @@ public static class Builder {
     private boolean unsafeByteBufferConversion;
     private OzoneClientConfig clientConfig;
     private ReplicationConfig replicationConfig;
-    private boolean atomicKeyCreation = false;
 
     public Builder setMultipartUploadID(String uploadID) {
       this.multipartUploadID = uploadID;
@@ -555,11 +537,6 @@ public Builder setReplicationConfig(ReplicationConfig 
replConfig) {
       return this;
     }
 
-    public Builder setAtomicKeyCreation(boolean atomicKey) {
-      this.atomicKeyCreation = atomicKey;
-      return this;
-    }
-
     public KeyDataStreamOutput build() {
       return new KeyDataStreamOutput(
           clientConfig,
@@ -572,8 +549,7 @@ public KeyDataStreamOutput build() {
           multipartUploadID,
           multipartNumber,
           isMultipartKey,
-          unsafeByteBufferConversion,
-          atomicKeyCreation);
+          unsafeByteBufferConversion);
     }
 
   }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 3a6499ea0c5..a3a1ca28030 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -98,13 +98,6 @@ public class KeyOutputStream extends OutputStream
   private long clientID;
   private StreamBufferArgs streamBufferArgs;
 
-  /**
-   * Indicates if an atomic write is required. When set to true,
-   * the amount of data written must match the declared size during the commit.
-   * A mismatch will prevent the commit from succeeding.
-   * This is essential for operations like S3 put to ensure atomicity.
-   */
-  private boolean atomicKeyCreation;
   private ContainerClientMetrics clientMetrics;
   private OzoneManagerVersion ozoneManagerVersion;
   private final Lock writeLock = new ReentrantLock();
@@ -187,7 +180,6 @@ public KeyOutputStream(Builder b) {
     this.isException = false;
     this.writeOffset = 0;
     this.clientID = b.getOpenHandler().getId();
-    this.atomicKeyCreation = b.getAtomicKeyCreation();
     this.streamBufferArgs = b.getStreamBufferArgs();
     this.clientMetrics = b.getClientMetrics();
     this.ozoneManagerVersion = b.ozoneManagerVersion;
@@ -657,12 +649,6 @@ private void closeInternal() throws IOException {
       if (!isException) {
         Preconditions.checkArgument(writeOffset == offset);
       }
-      if (atomicKeyCreation) {
-        long expectedSize = blockOutputStreamEntryPool.getDataSize();
-        Preconditions.checkState(expectedSize == offset,
-            String.format("Expected: %d and actual %d write sizes do not 
match",
-                expectedSize, offset));
-      }
       for (CheckedRunnable<IOException> preCommit : preCommits) {
         preCommit.run();
       }
@@ -703,7 +689,6 @@ public static class Builder {
     private OzoneClientConfig clientConfig;
     private ReplicationConfig replicationConfig;
     private ContainerClientMetrics clientMetrics;
-    private boolean atomicKeyCreation = false;
     private StreamBufferArgs streamBufferArgs;
     private Supplier<ExecutorService> executorServiceSupplier;
     private OzoneManagerVersion ozoneManagerVersion;
@@ -802,11 +787,6 @@ public Builder setReplicationConfig(ReplicationConfig 
replConfig) {
       return this;
     }
 
-    public Builder setAtomicKeyCreation(boolean atomicKey) {
-      this.atomicKeyCreation = atomicKey;
-      return this;
-    }
-
     public Builder setClientMetrics(ContainerClientMetrics clientMetrics) {
       this.clientMetrics = clientMetrics;
       return this;
@@ -816,10 +796,6 @@ public ContainerClientMetrics getClientMetrics() {
       return clientMetrics;
     }
 
-    public boolean getAtomicKeyCreation() {
-      return atomicKeyCreation;
-    }
-
     public Builder setExecutorServiceSupplier(Supplier<ExecutorService> 
executorServiceSupplier) {
       this.executorServiceSupplier = executorServiceSupplier;
       return this;
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
index c0e14b089ef..a7eda7da284 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
@@ -19,12 +19,14 @@
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import org.apache.hadoop.crypto.CryptoOutputStream;
 import org.apache.hadoop.fs.Syncable;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+import org.apache.ratis.util.function.CheckedRunnable;
 
 /**
  * OzoneOutputStream is used to write data into Ozone.
@@ -128,9 +130,9 @@ public void hsync() throws IOException {
   }
 
   public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
-    KeyOutputStream keyOutputStream = getKeyOutputStream();
-    if (keyOutputStream != null) {
-      return keyOutputStream.getCommitUploadPartInfo();
+    KeyCommitOutput keyCommitOutput = getKeyCommitOutput();
+    if (keyCommitOutput != null) {
+      return keyCommitOutput.getCommitUploadPartInfo();
     }
     // Otherwise return null.
     return null;
@@ -139,12 +141,23 @@ public OmMultipartCommitUploadPartInfo 
getCommitUploadPartInfo() {
   public OutputStream getOutputStream() {
     return outputStream;
   }
-  
+
   public KeyOutputStream getKeyOutputStream() {
     OutputStream base = unwrap(outputStream);
     return base instanceof KeyOutputStream ? (KeyOutputStream) base : null;
   }
 
+  public void setPreCommits(List<CheckedRunnable<IOException>> preCommits) {
+    KeyCommitOutput keyCommitOutput = getKeyCommitOutput();
+    if (keyCommitOutput != null) {
+      keyCommitOutput.setPreCommits(preCommits);
+      return;
+    }
+    throw new IllegalStateException(
+        "Output stream is not backed by KeyCommitOutput: " +
+            outputStream.getClass());
+  }
+
   @Override
   public Map<String, String> getMetadata() {
     OutputStream base = unwrap(outputStream);
@@ -155,6 +168,11 @@ public Map<String, String> getMetadata() {
         "OutputStream is not KeyMetadataAware: " + base.getClass());
   }
 
+  private KeyCommitOutput getKeyCommitOutput() {
+    OutputStream base = unwrap(outputStream);
+    return base instanceof KeyCommitOutput ? (KeyCommitOutput) base : null;
+  }
+
   private static OutputStream unwrap(OutputStream out) {
     if (out instanceof CryptoOutputStream) {
       return ((CryptoOutputStream) out).getWrappedStream();
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
index c8611043fe4..8653048f8b2 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
@@ -1224,8 +1224,6 @@ OzoneKey headObject(String volumeName, String bucketName,
    */
   void setThreadLocalS3Auth(S3Auth s3Auth);
 
-  void setIsS3Request(boolean isS3Request);
-
   /**
    * Gets the S3 Authentication information that is attached to the thread.
    * @return S3 Authentication information.
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 0b058362a30..d1f302a5cba 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -55,7 +55,6 @@
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import javax.crypto.Cipher;
@@ -223,7 +222,6 @@ public class RpcClient implements ClientProtocol {
   private final MemoizedSupplier<ExecutorService> ecReconstructExecutor;
   private final ContainerClientMetrics clientMetrics;
   private final MemoizedSupplier<ExecutorService> writeExecutor;
-  private final AtomicBoolean isS3GRequest = new AtomicBoolean(false);
   private volatile OzoneFsServerDefaults serverDefaults;
   private volatile long serverDefaultsLastUpdate;
   private final long serverDefaultsValidityPeriod;
@@ -1473,13 +1471,6 @@ private OmKeyArgs.Builder 
createWriteKeyArgsBuilder(String volumeName,
   private OzoneOutputStream openOutputStream(OmKeyArgs keyArgs, long size)
       throws IOException {
     OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
-    // For bucket with layout OBJECT_STORE, when create an empty file (size=0),
-    // OM will set DataSize to OzoneConfigKeys#OZONE_SCM_BLOCK_SIZE,
-    // which will cause S3G's atomic write length check to fail,
-    // so reset size to 0 here.
-    if (isS3GRequest.get() && size == 0) {
-      openKey.getKeyInfo().setDataSize(0);
-    }
     return createOutputStream(openKey);
   }
 
@@ -2588,15 +2579,12 @@ private OzoneDataStreamOutput 
createDataStreamOutput(OpenKeySession openKey)
   }
 
   private KeyDataStreamOutput.Builder newKeyOutputStreamBuilder() {
-    // Amazon S3 never adds partial objects, So for S3 requests we need to
-    // set atomicKeyCreation to true
     // refer: 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
     return new KeyDataStreamOutput.Builder()
         .setXceiverClientManager(xceiverClientManager)
         .setOmClient(ozoneManagerClient)
         .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
-        .setConfig(clientConfig)
-        .setAtomicKeyCreation(isS3GRequest.get());
+        .setConfig(clientConfig);
   }
 
   private OzoneOutputStream createOutputStream(OpenKeySession openKey)
@@ -2670,7 +2658,6 @@ private KeyOutputStream.Builder createKeyOutputStream(
         .setOmClient(ozoneManagerClient)
         .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
         .setConfig(clientConfig)
-        .setAtomicKeyCreation(isS3GRequest.get())
         .setClientMetrics(clientMetrics)
         .setExecutorServiceSupplier(writeExecutor)
         .setStreamBufferArgs(streamBufferArgs)
@@ -2774,11 +2761,6 @@ public void setThreadLocalS3Auth(
     this.s3gUgi = 
UserGroupInformation.createRemoteUser(getThreadLocalS3Auth().getUserPrincipal());
   }
 
-  @Override
-  public void setIsS3Request(boolean s3Request) {
-    this.isS3GRequest.set(s3Request);
-  }
-
   @Override
   public S3Auth getThreadLocalS3Auth() {
     return ozoneManagerClient.getThreadLocalS3Auth();
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
index 84b423a28ca..c96fe8bfc5b 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
@@ -220,40 +220,6 @@ public void testPutKeyWithECReplicationConfig() throws 
IOException {
     }
   }
 
-  /**
-   * This test validates that for S3G,
-   * the key upload process needs to be atomic.
-   * It simulates two mismatch scenarios where the actual write data size does
-   * not match the expected size.
-   */
-  @Test
-  public void testPutKeySizeMismatch() throws IOException {
-    String value = new String(new byte[1024], UTF_8);
-    OzoneBucket bucket = getOzoneBucket();
-    String keyName = UUID.randomUUID().toString();
-    try {
-      // Simulating first mismatch: Write less data than expected
-      client.getProxy().setIsS3Request(true);
-      OzoneOutputStream out1 = bucket.createKey(keyName,
-          value.getBytes(UTF_8).length, ReplicationType.RATIS, ONE,
-          new HashMap<>());
-      out1.write(value.substring(0, value.length() - 1).getBytes(UTF_8));
-      assertThrows(IllegalStateException.class, out1::close,
-          "Expected IllegalArgumentException due to size mismatch.");
-
-      // Simulating second mismatch: Write more data than expected
-      OzoneOutputStream out2 = bucket.createKey(keyName,
-          value.getBytes(UTF_8).length, ReplicationType.RATIS, ONE,
-          new HashMap<>());
-      value += "1";
-      out2.write(value.getBytes(UTF_8));
-      assertThrows(IllegalStateException.class, out2::close,
-          "Expected IllegalArgumentException due to size mismatch.");
-    } finally {
-      client.getProxy().setIsS3Request(false);
-    }
-  }
-
   private OzoneBucket getOzoneBucket() throws IOException {
     String volumeName = UUID.randomUUID().toString();
     String bucketName = UUID.randomUUID().toString();
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestOzoneOutputStream.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestOzoneOutputStream.java
index d6a906582f4..a44d614417f 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestOzoneOutputStream.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestOzoneOutputStream.java
@@ -19,6 +19,7 @@
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -26,8 +27,10 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.crypto.CryptoOutputStream;
+import org.apache.ratis.util.function.CheckedRunnable;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -39,10 +42,10 @@ public class TestOzoneOutputStream {
    * Fake KeyOutputStream implementation for testing.
    * Uses the package-private KeyOutputStream() constructor.
    */
-  private static class FakeKeyOutputStream extends KeyOutputStream
-      implements KeyMetadataAware {
+  private static class FakeKeyOutputStream extends KeyOutputStream {
 
     private final Map<String, String> metadata;
+    private List<CheckedRunnable<IOException>> preCommits;
 
     FakeKeyOutputStream(Map<String, String> metadata) {
       super(); // VisibleForTesting constructor
@@ -54,6 +57,15 @@ public Map<String, String> getMetadata() {
       return metadata;
     }
 
+    @Override
+    public void setPreCommits(List<CheckedRunnable<IOException>> preCommits) {
+      this.preCommits = preCommits;
+    }
+
+    List<CheckedRunnable<IOException>> getPreCommits() {
+      return preCommits;
+    }
+
     @Override
     public void flush() {
       // avoid KeyOutputStream.flush() using null semaphore
@@ -133,6 +145,35 @@ public void testCipherWrapped() throws IOException {
     }
   }
 
+  @Test
+  public void testSetPreCommits() throws IOException {
+    FakeKeyOutputStream key =
+        new FakeKeyOutputStream(Collections.emptyMap());
+    List<CheckedRunnable<IOException>> preCommits =
+        Collections.singletonList(() -> { });
+
+    try (OzoneOutputStream ozone = new OzoneOutputStream(key, null)) {
+      ozone.setPreCommits(preCommits);
+    }
+
+    assertSame(preCommits, key.getPreCommits());
+  }
+
+  @Test
+  public void testSetPreCommitsRequiresKeyCommitOutput() throws IOException {
+    OutputStream stream = new OutputStream() {
+      @Override
+      public void write(int b) {
+
+      }
+    };
+
+    try (OzoneOutputStream ozone = new OzoneOutputStream(stream, null)) {
+      assertThrows(IllegalStateException.class,
+          () -> ozone.setPreCommits(Collections.emptyList()));
+    }
+  }
+
   /**
    * test for Non-KeyMetadataAware stream verify that exception is thrown here.
    */
diff --git 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
index e9c7f2c9fa8..224a823e14d 100644
--- 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
@@ -29,6 +29,7 @@
 import static 
org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT;
 import static 
org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_CLIENT_BUFFER_SIZE_KEY;
 import static 
org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_ARGUMENT;
+import static 
org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_REQUEST;
 import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_TAG;
 import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.newError;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.AWS_TAG_PREFIX;
@@ -107,6 +108,7 @@
 import org.apache.hadoop.ozone.s3.util.S3Utils;
 import org.apache.http.NameValuePair;
 import org.apache.http.client.utils.URLEncodedUtils;
+import org.apache.ratis.util.function.CheckedRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -198,7 +200,6 @@ public void initialization() {
     ClientProtocol clientProtocol =
         getClient().getObjectStore().getClientProxy();
     clientProtocol.setThreadLocalS3Auth(s3Auth);
-    clientProtocol.setIsS3Request(true);
 
     bufferSize = (int) getOzoneConfiguration().getStorageSize(
         OZONE_S3G_CLIENT_BUFFER_SIZE_KEY,
@@ -690,6 +691,19 @@ public static MessageDigest getSha256DigestInstance() {
     return SHA_256_PROVIDER.get();
   }
 
+  protected static CheckedRunnable<IOException> validateContentLength(
+      long expectedLength, long actualLength, String keyPath) {
+    return () -> {
+      if (actualLength != expectedLength) {
+        OS3Exception ex = newError(INVALID_REQUEST, keyPath);
+        ex.setErrorMessage(String.format(
+            "Request body length %d does not match expected length %d",
+            actualLength, expectedLength));
+        throw ex;
+      }
+    };
+  }
+
   protected static String extractPartsCount(String eTag) {
     if (eTag.contains("-")) {
       String[] parts = eTag.replace("\"", "").split("-");
diff --git 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 38cd6965a22..acf358e3ce8 100644
--- 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -54,8 +54,6 @@
 import java.time.Instant;
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -284,27 +282,27 @@ customMetadata, tags, multiDigestInputStream, 
getHeaders(),
       } else {
         final String amzContentSha256Header =
             validateSignatureHeader(getHeaders(), keyPath, 
signatureInfo.isSignPayload());
-        try (OzoneOutputStream output = openKeyForPut(
-            volume.getName(), bucketName, keyPath, length,
-            replicationConfig, customMetadata, tags, writeConditions)) {
+        final long expectedLength = length;
+        try (S3ObjectWriteGuard output =
+            new S3ObjectWriteGuard(openKeyForPut(
+                volume.getName(), bucketName, keyPath, expectedLength,
+                replicationConfig, customMetadata, tags, writeConditions),
+                expectedLength, keyPath)) {
           long metadataLatencyNs =
               getMetrics().updatePutKeyMetadataStats(startNanos);
           perf.appendMetaLatencyNanos(metadataLatencyNs);
-          putLength = IOUtils.copyLarge(multiDigestInputStream, output, 0, 
length,
-              new byte[getIOBufferSize(length)]);
+          putLength = output.copyFrom(multiDigestInputStream, 
getIOBufferSize(expectedLength));
           md5Hash = DatatypeConverter.printHexBinary(
                   
multiDigestInputStream.getMessageDigest(OzoneConsts.MD5_HASH).digest())
               .toLowerCase();
           output.getMetadata().put(OzoneConsts.ETAG, md5Hash);
 
-          List<CheckedRunnable<IOException>> preCommits = new ArrayList<>();
-
           String clientContentMD5 = 
getHeaders().getHeaderString(S3Consts.CHECKSUM_HEADER);
           if (clientContentMD5 != null) {
             CheckedRunnable<IOException> checkContentMD5Hook = () -> {
               S3Utils.validateContentMD5(clientContentMD5, md5Hash, keyPath);
             };
-            preCommits.add(checkContentMD5Hook);
+            output.addPreCommit(checkContentMD5Hook);
           }
 
           // If sha256Digest exists, this request must validate 
x-amz-content-sha256
@@ -317,9 +315,8 @@ customMetadata, tags, multiDigestInputStream, getHeaders(),
                 throw 
S3ErrorTable.newError(S3ErrorTable.X_AMZ_CONTENT_SHA256_MISMATCH, keyPath);
               }
             };
-            preCommits.add(checkSha256Hook);
+            output.addPreCommit(checkSha256Hook);
           }
-          output.getKeyOutputStream().setPreCommits(preCommits);
         }
       }
       getMetrics().incPutKeySuccessLength(putLength);
@@ -884,18 +881,19 @@ private Response createMultipartKey(OzoneVolume volume, 
OzoneBucket ozoneBucket,
                       + rangeHeader.getStartOffset() + " actual: " + skipped);
             }
           }
-          try (OzoneOutputStream ozoneOutputStream = getClientProtocol()
-              .createMultipartKey(volume.getName(), bucketName, key, length,
-                  partNumber, uploadID)) {
+          final long expectedLength = length;
+          OzoneOutputStream ozoneOutputStream = getClientProtocol()
+              .createMultipartKey(volume.getName(), bucketName, key,
+                  expectedLength, partNumber, uploadID);
+          try (S3ObjectWriteGuard writeGuard =
+              new S3ObjectWriteGuard(ozoneOutputStream, expectedLength, key)) {
             metadataLatencyNs =
                 getMetrics().updateCopyKeyMetadataStats(startNanos);
-            copyLength = IOUtils.copyLarge(sourceObject, ozoneOutputStream, 0, 
length,
-                new byte[getIOBufferSize(length)]);
-            ozoneOutputStream.getMetadata()
-                .putAll(sourceKeyDetails.getMetadata());
-            String raw = ozoneOutputStream.getMetadata().get(OzoneConsts.ETAG);
+            copyLength = writeGuard.copyFrom(sourceObject, 
getIOBufferSize(expectedLength));
+            writeGuard.getMetadata().putAll(sourceKeyDetails.getMetadata());
+            String raw = writeGuard.getMetadata().get(OzoneConsts.ETAG);
             if (raw != null) {
-              ozoneOutputStream.getMetadata().put(OzoneConsts.ETAG, 
stripQuotes(raw));
+              writeGuard.getMetadata().put(OzoneConsts.ETAG, stripQuotes(raw));
             }
             outputStream = ozoneOutputStream;
           }
@@ -904,13 +902,13 @@ private Response createMultipartKey(OzoneVolume volume, 
OzoneBucket ozoneBucket,
         }
       } else {
         long putLength;
-        try (OzoneOutputStream ozoneOutputStream = getClientProtocol()
-            .createMultipartKey(volume.getName(), bucketName, key, length,
-                partNumber, uploadID)) {
+        final long expectedLength = length;
+        OzoneOutputStream ozoneOutputStream = getClientProtocol()
+            .createMultipartKey(volume.getName(), bucketName, key, 
expectedLength, partNumber, uploadID);
+        try (S3ObjectWriteGuard writeGuard = new 
S3ObjectWriteGuard(ozoneOutputStream, expectedLength, key)) {
           metadataLatencyNs =
               getMetrics().updatePutKeyMetadataStats(startNanos);
-          putLength = IOUtils.copyLarge(multiDigestInputStream, 
ozoneOutputStream, 0, length,
-              new byte[getIOBufferSize(length)]);
+          putLength = writeGuard.copyFrom(multiDigestInputStream, 
getIOBufferSize(expectedLength));
           byte[] digest = 
multiDigestInputStream.getMessageDigest(OzoneConsts.MD5_HASH).digest();
           String md5Hash = 
DatatypeConverter.printHexBinary(digest).toLowerCase();
           String clientContentMD5 = 
getHeaders().getHeaderString(S3Consts.CHECKSUM_HEADER);
@@ -918,9 +916,9 @@ private Response createMultipartKey(OzoneVolume volume, 
OzoneBucket ozoneBucket,
             CheckedRunnable<IOException> checkContentMD5Hook = () -> {
               S3Utils.validateContentMD5(clientContentMD5, md5Hash, key);
             };
-            
ozoneOutputStream.getKeyOutputStream().setPreCommits(Collections.singletonList(checkContentMD5Hook));
+            writeGuard.addPreCommit(checkContentMD5Hook);
           }
-          ozoneOutputStream.getMetadata().put(OzoneConsts.ETAG, md5Hash);
+          writeGuard.getMetadata().put(OzoneConsts.ETAG, md5Hash);
           outputStream = ozoneOutputStream;
         }
         getMetrics().incPutKeySuccessLength(putLength);
@@ -991,13 +989,14 @@ srcKeyLen > getDatastreamMinLength()) {
               getChunkSize(), replication, metadata, src, perf, startNanos, 
tags,
               writeConditions);
     } else {
-      try (OzoneOutputStream dest = openKeyForPut(
-          volume.getName(), destBucket, destKey, srcKeyLen,
-          replication, metadata, tags, writeConditions)) {
+      final long expectedLength = srcKeyLen;
+      try (S3ObjectWriteGuard dest = new S3ObjectWriteGuard(openKeyForPut(
+          volume.getName(), destBucket, destKey, expectedLength,
+          replication, metadata, tags, writeConditions), expectedLength, 
destKey)) {
         long metadataLatencyNs =
             getMetrics().updateCopyKeyMetadataStats(startNanos);
         perf.appendMetaLatencyNanos(metadataLatencyNs);
-        copyLength = IOUtils.copyLarge(src, dest, 0, srcKeyLen, new 
byte[getIOBufferSize(srcKeyLen)]);
+        copyLength = dest.copyFrom(src, getIOBufferSize(expectedLength));
         String md5Hash = 
DatatypeConverter.printHexBinary(src.getMessageDigest().digest()).toLowerCase();
         dest.getMetadata().put(OzoneConsts.ETAG, md5Hash);
       }
diff --git 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
index fa1cde66049..55420a1668c 100644
--- 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
@@ -24,12 +24,8 @@
 import static org.apache.hadoop.ozone.s3.util.S3Utils.wrapInQuotes;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
 import java.security.DigestInputStream;
 import java.security.MessageDigest;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response;
@@ -38,7 +34,6 @@
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.OzoneBucket;
-import org.apache.hadoop.ozone.client.io.KeyMetadataAware;
 import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
 import org.apache.hadoop.ozone.om.OmConfig;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -121,23 +116,23 @@ public static Pair<String, Long> putKeyWithStream(
     final String amzContentSha256Header = validateSignatureHeader(headers, 
keyPath, isSignedPayload);
     long writeLen;
     String md5Hash;
-    try (OzoneDataStreamOutput streamOutput = openStreamKeyForPut(bucket,
-        keyPath, length, replicationConfig, keyMetadata, tags,
-        writeConditions)) {
+    try (S3ObjectStreamingWriteGuard writeGuard =
+        new S3ObjectStreamingWriteGuard(openStreamKeyForPut(bucket,
+            keyPath, length, replicationConfig, keyMetadata, tags,
+            writeConditions), length, keyPath)) {
       long metadataLatencyNs = METRICS.updatePutKeyMetadataStats(startNanos);
-      writeLen = writeToStreamOutput(streamOutput, body, bufferSize, length);
+      writeLen = writeGuard.copyFrom(body, bufferSize);
       md5Hash = 
DatatypeConverter.printHexBinary(body.getMessageDigest(OzoneConsts.MD5_HASH).digest())
           .toLowerCase();
       perf.appendMetaLatencyNanos(metadataLatencyNs);
-      ((KeyMetadataAware)streamOutput).getMetadata().put(OzoneConsts.ETAG, 
md5Hash);
+      writeGuard.getMetadata().put(OzoneConsts.ETAG, md5Hash);
 
-      List<CheckedRunnable<IOException>> preCommits = new ArrayList<>();
       String clientContentMD5 = 
headers.getHeaderString(S3Consts.CHECKSUM_HEADER);
       if (clientContentMD5 != null) {
         CheckedRunnable<IOException> checkContentMD5Hook = () -> {
           S3Utils.validateContentMD5(clientContentMD5, md5Hash, keyPath);
         };
-        preCommits.add(checkContentMD5Hook);
+        writeGuard.addPreCommit(checkContentMD5Hook);
       }
 
       // If sha256Digest exists, this request must validate 
x-amz-content-sha256
@@ -150,10 +145,8 @@ public static Pair<String, Long> putKeyWithStream(
             throw 
S3ErrorTable.newError(S3ErrorTable.X_AMZ_CONTENT_SHA256_MISMATCH, keyPath);
           }
         };
-        preCommits.add(checkSha256Hook);
+        writeGuard.addPreCommit(checkSha256Hook);
       }
-
-      streamOutput.setPreCommits(preCommits);
     }
     return Pair.of(md5Hash, writeLen);
   }
@@ -189,38 +182,21 @@ public static long copyKeyWithStream(
       S3ConditionalRequest.WriteConditions writeConditions)
       throws IOException {
     long writeLen;
-    try (OzoneDataStreamOutput streamOutput = openStreamKeyForPut(bucket,
-        keyPath, length, replicationConfig, keyMetadata, tags,
-        writeConditions)) {
+    try (S3ObjectStreamingWriteGuard writeGuard =
+        new S3ObjectStreamingWriteGuard(openStreamKeyForPut(bucket,
+            keyPath, length, replicationConfig, keyMetadata, tags,
+            writeConditions), length, keyPath)) {
       long metadataLatencyNs =
           METRICS.updateCopyKeyMetadataStats(startNanos);
-      writeLen = writeToStreamOutput(streamOutput, body, bufferSize, length);
+      writeLen = writeGuard.copyFrom(body, bufferSize);
       String eTag = 
DatatypeConverter.printHexBinary(body.getMessageDigest().digest())
           .toLowerCase();
       perf.appendMetaLatencyNanos(metadataLatencyNs);
-      ((KeyMetadataAware)streamOutput).getMetadata().put(OzoneConsts.ETAG, 
eTag);
+      writeGuard.getMetadata().put(OzoneConsts.ETAG, eTag);
     }
     return writeLen;
   }
 
-  private static long writeToStreamOutput(OzoneDataStreamOutput streamOutput,
-                                          InputStream body, int bufferSize,
-                                          long length)
-      throws IOException {
-    final byte[] buffer = new byte[bufferSize];
-    long n = 0;
-    while (n < length) {
-      final int toRead = Math.toIntExact(Math.min(bufferSize, length - n));
-      final int readLength = body.read(buffer, 0, toRead);
-      if (readLength == -1) {
-        break;
-      }
-      streamOutput.write(ByteBuffer.wrap(buffer, 0, readLength));
-      n += readLength;
-    }
-    return n;
-  }
-
   @SuppressWarnings("checkstyle:ParameterNumber")
   public static Response createMultipartKey(OzoneBucket ozoneBucket, String 
key,
       long length, int partNumber, String uploadID, int chunkSize,
@@ -229,23 +205,22 @@ public static Response createMultipartKey(OzoneBucket 
ozoneBucket, String key,
     long startNanos = Time.monotonicNowNanos();
     String eTag;
     try {
-      try (OzoneDataStreamOutput streamOutput = ozoneBucket
-          .createMultipartStreamKey(key, length, partNumber, uploadID)) {
+      try (S3ObjectStreamingWriteGuard writeGuard =
+          new S3ObjectStreamingWriteGuard(ozoneBucket
+              .createMultipartStreamKey(key, length, partNumber, uploadID),
+              length, key)) {
         long metadataLatencyNs = METRICS.updatePutKeyMetadataStats(startNanos);
-        long putLength =
-            writeToStreamOutput(streamOutput, body, chunkSize, length);
+        long putLength = writeGuard.copyFrom(body, chunkSize);
         eTag = DatatypeConverter.printHexBinary(
             
body.getMessageDigest(OzoneConsts.MD5_HASH).digest()).toLowerCase();
-        List<CheckedRunnable<IOException>> preCommits = new ArrayList<>();
         String clientContentMD5 = 
headers.getHeaderString(S3Consts.CHECKSUM_HEADER);
         if (clientContentMD5 != null) {
           CheckedRunnable<IOException> checkContentMD5Hook = () -> {
             S3Utils.validateContentMD5(clientContentMD5, eTag, key);
           };
-          preCommits.add(checkContentMD5Hook);
+          writeGuard.addPreCommit(checkContentMD5Hook);
         }
-        streamOutput.setPreCommits(preCommits);
-        ((KeyMetadataAware)streamOutput).getMetadata().put(OzoneConsts.ETAG, 
eTag);
+        writeGuard.getMetadata().put(OzoneConsts.ETAG, eTag);
         METRICS.incPutKeySuccessLength(putLength);
         perf.appendMetaLatencyNanos(metadataLatencyNs);
         perf.appendSizeBytes(putLength);
diff --git 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/S3ObjectStreamingWriteGuard.java
 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/S3ObjectStreamingWriteGuard.java
new file mode 100644
index 00000000000..31e862b9796
--- /dev/null
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/S3ObjectStreamingWriteGuard.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.s3.endpoint;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.hadoop.ozone.client.io.KeyMetadataAware;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
+
+/**
+ * Guards close-time commit for write request using datastream output.
+ */
+final class S3ObjectStreamingWriteGuard extends S3ObjectWriteGuard {
+
+  private final OzoneDataStreamOutput outputStream;
+
+  S3ObjectStreamingWriteGuard(
+      OzoneDataStreamOutput outputStream, long expectedLength, String keyPath) 
{
+    super(outputStream, expectedLength, keyPath);
+    this.outputStream = outputStream;
+    outputStream.setPreCommits(getPreCommits());
+  }
+
+  @Override
+  protected void write(byte[] buffer, int offset, int length)
+      throws IOException {
+    outputStream.write(ByteBuffer.wrap(buffer, offset, length));
+  }
+
+  @Override
+  public Map<String, String> getMetadata() {
+    return ((KeyMetadataAware) outputStream).getMetadata();
+  }
+
+  @Override
+  public void close() throws IOException {
+    outputStream.close();
+  }
+}
diff --git 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/S3ObjectWriteGuard.java
 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/S3ObjectWriteGuard.java
new file mode 100644
index 00000000000..45dcfb15088
--- /dev/null
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/S3ObjectWriteGuard.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.s3.endpoint;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.ratis.util.function.CheckedRunnable;
+
+/**
+ * Tracks bytes written for a write request and guards close-time commit.
+ */
+class S3ObjectWriteGuard implements AutoCloseable {
+
+  private final OutputStream outputStream;
+  private final long expectedLength;
+  private final List<CheckedRunnable<IOException>> preCommits =
+      new ArrayList<>();
+  private long writtenLength;
+
+  S3ObjectWriteGuard(
+      OzoneOutputStream outputStream, long expectedLength, String keyPath) {
+    this.outputStream = outputStream;
+    this.expectedLength = expectedLength;
+    addContentLengthValidation(keyPath);
+    outputStream.setPreCommits(getPreCommits());
+  }
+
+  protected S3ObjectWriteGuard(
+      OutputStream outputStream, long expectedLength, String keyPath) {
+    this.outputStream = outputStream;
+    this.expectedLength = expectedLength;
+    addContentLengthValidation(keyPath);
+  }
+
+  private void addContentLengthValidation(String keyPath) {
+    preCommits.add(() -> EndpointBase.validateContentLength(
+        expectedLength, writtenLength, keyPath).run());
+  }
+
+  protected List<CheckedRunnable<IOException>> getPreCommits() {
+    return preCommits;
+  }
+
+  void addPreCommit(CheckedRunnable<IOException> preCommit) {
+    preCommits.add(preCommit);
+  }
+
+  long copyFrom(InputStream body, int bufferSize) throws IOException {
+    byte[] buffer = new byte[bufferSize];
+    while (writtenLength < expectedLength) {
+      int toRead = Math.toIntExact(
+          Math.min(bufferSize, expectedLength - writtenLength));
+      int readLength = body.read(buffer, 0, toRead);
+      if (readLength == -1) {
+        break;
+      }
+      write(buffer, 0, readLength);
+      writtenLength += readLength;
+    }
+    return writtenLength;
+  }
+
+  protected void write(byte[] buffer, int offset, int length)
+      throws IOException {
+    outputStream.write(buffer, offset, length);
+  }
+
+  public Map<String, String> getMetadata() {
+    return ((OzoneOutputStream) outputStream).getMetadata();
+  }
+
+  @Override
+  public void close() throws IOException {
+    outputStream.close();
+  }
+}
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
index 35956bc1df8..0ef0a790388 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
@@ -688,11 +688,6 @@ public void setThreadLocalS3Auth(S3Auth s3Auth) {
 
   }
 
-  @Override
-  public void setIsS3Request(boolean isS3Request) {
-
-  }
-
   @Override
   public S3Auth getThreadLocalS3Auth() {
     return null;
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
index 3b12b72540f..10ed365b2b0 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
@@ -145,7 +145,9 @@ public OzoneOutputStream createKey(String key, long size,
         new KeyMetadataAwareOutputStream(metadata) {
           @Override
           public void close() throws IOException {
-            keyContents.put(key, toByteArray());
+            byte[] bytes = toByteArray();
+            super.close();
+            keyContents.put(key, bytes);
             keyDetails.put(key, new OzoneKeyDetails(
                 getVolumeName(),
                 getName(),
@@ -158,7 +160,6 @@ public void close() throws IOException {
                 UserGroupInformation.getCurrentUser().getShortUserName(),
                 tags
             ));
-            super.close();
           }
         };
 
@@ -179,7 +180,9 @@ public OzoneOutputStream rewriteKey(String keyName, long 
size, long existingKeyG
         new KeyMetadataAwareOutputStream(metadata) {
           @Override
           public void close() throws IOException {
-            keyContents.put(keyName, toByteArray());
+            byte[] bytes = toByteArray();
+            super.close();
+            keyContents.put(keyName, bytes);
             keyDetails.put(keyName, new OzoneKeyDetails(
                 getVolumeName(),
                 getName(),
@@ -190,7 +193,6 @@ public void close() throws IOException {
                 new ArrayList<>(), finalReplicationCon, metadata, null,
                 () -> readKey(keyName), true, null, null
             ));
-            super.close();
           }
         };
 
@@ -531,8 +533,10 @@ public OzoneOutputStream createMultipartKey(String key, 
long size,
           new KeyMetadataAwareOutputStream((int) size, new HashMap<>()) {
             @Override
             public void close() throws IOException {
-              Part part = new Part(key + size,
-                  toByteArray(), getMetadata().get(ETAG));
+              byte[] bytes = toByteArray();
+              String eTag = getMetadata().get(ETAG);
+              super.close();
+              Part part = new Part(key + size, bytes, eTag);
               if (partList.get(key) == null) {
                 Map<Integer, Part> parts = new TreeMap<>();
                 parts.put(partNumber, part);
@@ -540,7 +544,6 @@ public void close() throws IOException {
               } else {
                 partList.get(key).put(partNumber, part);
               }
-              super.close();
             }
           };
       return new OzoneOutputStreamStub(keyOutputStream, key + size);
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/EndpointTestUtils.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/EndpointTestUtils.java
index 11ba8daf804..8827150069f 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/EndpointTestUtils.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/EndpointTestUtils.java
@@ -25,6 +25,7 @@
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.List;
 import javax.ws.rs.HttpMethod;
 import javax.ws.rs.core.HttpHeaders;
@@ -123,13 +124,28 @@ public static Response put(
       int partNumber,
       String uploadID,
       String content
+  ) throws IOException, OS3Exception {
+    return put(subject, bucket, key, partNumber, uploadID,
+        contentLength(content), content);
+  }
+
+  /** Put with content, part number, upload ID, and explicit Content-Length. */
+  public static Response put(
+      ObjectEndpoint subject,
+      String bucket,
+      String key,
+      int partNumber,
+      String uploadID,
+      long contentLength,
+      String content
   ) throws IOException, OS3Exception {
     if (uploadID != null) {
       subject.queryParamsForTest().set(S3Consts.QueryParams.UPLOAD_ID, 
uploadID);
     }
     subject.queryParamsForTest().setInt(S3Consts.QueryParams.PART_NUMBER, 
partNumber);
     when(subject.getContext().getMethod()).thenReturn(HttpMethod.PUT);
-    setLengthHeader(subject, content);
+    when(subject.getHeaders().getHeaderString(HttpHeaders.CONTENT_LENGTH))
+        .thenReturn(String.valueOf(contentLength));
 
     if (content == null) {
       return subject.put(bucket, key, null);
@@ -264,9 +280,44 @@ public static OS3Exception 
assertErrorResponse(S3ErrorTable expected, CheckedSup
   }
 
   private static void setLengthHeader(ObjectEndpoint subject, String content) {
-    final long length = content != null ? content.length() : 0;
     when(subject.getHeaders().getHeaderString(HttpHeaders.CONTENT_LENGTH))
-        .thenReturn(String.valueOf(length));
+        .thenReturn(String.valueOf(contentLength(content)));
+  }
+
+  private static long contentLength(String content) {
+    return content != null ? content.getBytes(UTF_8).length : 0;
+  }
+
+  static final class FailingInputStream extends InputStream {
+
+    private final byte[] content;
+    private final int failAfterBytes;
+    private int position;
+
+    FailingInputStream(byte[] content, int failAfterBytes) {
+      this.content = content;
+      this.failAfterBytes = failAfterBytes;
+    }
+
+    @Override
+    public int read(byte[] buffer, int offset, int length) throws IOException {
+      if (position >= failAfterBytes) {
+        throw new IOException("upload interrupted");
+      }
+
+      int bytesToRead = Math.min(length, failAfterBytes - position);
+      System.arraycopy(content, position, buffer, offset, bytesToRead);
+      position += bytesToRead;
+      return bytesToRead;
+    }
+
+    @Override
+    public int read() throws IOException {
+      if (position >= failAfterBytes) {
+        throw new IOException("upload interrupted");
+      }
+      return content[position++];
+    }
   }
 
   private EndpointTestUtils() {
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
index bcd4cdd9084..c9313207c01 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.ozone.s3.endpoint;
 
 import static 
org.apache.hadoop.ozone.client.OzoneClientTestUtils.assertKeyContent;
+import static 
org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.FailingInputStream;
 import static 
org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.assertErrorResponse;
 import static 
org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.assertSucceeds;
 import static org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.put;
@@ -44,8 +45,10 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
 import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.spy;
@@ -55,19 +58,17 @@
 
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.Base64;
 import java.util.Map;
 import java.util.stream.Stream;
+import javax.ws.rs.HttpMethod;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MultivaluedHashMap;
 import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.Response;
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
@@ -264,15 +265,18 @@ void testPutObjectWithSignedChunks() throws Exception {
   @Test
   public void testPutObjectMessageDigestResetDuringException() {
     MessageDigest messageDigest = mock(MessageDigest.class);
-    try (MockedStatic<IOUtils> mocked = mockStatic(IOUtils.class);
-        MockedStatic<EndpointBase> endpoint = mockStatic(EndpointBase.class)) {
+    try (MockedStatic<EndpointBase> endpoint =
+        mockStatic(EndpointBase.class, CALLS_REAL_METHODS)) {
       // For example, EOFException during put-object due to client cancelling 
the operation before it completes
-      mocked.when(() -> IOUtils.copyLarge(any(InputStream.class), 
any(OutputStream.class), anyLong(),
-              anyLong(), any(byte[].class)))
-          .thenThrow(IOException.class);
+      when(messageDigest.getAlgorithm()).thenReturn(OzoneConsts.MD5_HASH);
       
endpoint.when(EndpointBase::getMD5DigestInstance).thenReturn(messageDigest);
+      setPutRequestLength(CONTENT.length());
 
-      assertThrows(IOException.class, () -> putObject(CONTENT).close());
+      IOException ex = assertThrows(IOException.class,
+          () -> objectEndpoint.put(BUCKET_NAME, KEY_NAME,
+              new FailingInputStream(CONTENT.getBytes(StandardCharsets.UTF_8), 
5)).close());
+      assertEquals("upload interrupted", ex.getMessage());
+      assertThrows(IOException.class, () -> bucket.getKey(KEY_NAME));
 
       // Verify that the message digest is reset so that the instance can be 
reused for the
       // next request in the same thread
@@ -386,20 +390,21 @@ public void 
testCopyObjectMessageDigestResetDuringException() throws Exception {
     assertThat(keyDetails.getMetadata().get(OzoneConsts.ETAG)).isNotEmpty();
 
     MessageDigest messageDigest = mock(MessageDigest.class);
-    try (MockedStatic<IOUtils> mocked = mockStatic(IOUtils.class);
-        MockedStatic<EndpointBase> endpoint = mockStatic(EndpointBase.class)) {
+    try (MockedStatic<EndpointBase> endpoint =
+        mockStatic(EndpointBase.class, CALLS_REAL_METHODS)) {
       // Add the mocked methods only during the copy request
       
endpoint.when(EndpointBase::getMD5DigestInstance).thenReturn(messageDigest);
-      endpoint.when(() -> 
EndpointBase.parseSourceHeader(any())).thenCallRealMethod();
-      mocked.when(() -> IOUtils.copyLarge(any(InputStream.class), 
any(OutputStream.class), anyLong(),
-              anyLong(), any(byte[].class)))
-          .thenThrow(IOException.class);
+      doThrow(new RuntimeException("digest interrupted"))
+          .when(messageDigest).update(any(byte[].class), anyInt(), anyInt());
 
       // Add copy header, and then call put
       when(headers.getHeaderString(COPY_SOURCE_HEADER)).thenReturn(
           BUCKET_NAME  + "/" + urlEncode(KEY_NAME));
 
-      assertThrows(IOException.class, () -> putObject(DEST_BUCKET_NAME, 
DEST_KEY).close());
+      RuntimeException ex = assertThrows(RuntimeException.class,
+          () -> putObject(DEST_BUCKET_NAME, DEST_KEY).close());
+      assertEquals("digest interrupted", ex.getMessage());
+      assertThrows(IOException.class, () -> destBucket.getKey(DEST_KEY));
       // Verify that the message digest is reset so that the instance can be 
reused for the
       // next request in the same thread
       verify(messageDigest, times(1)).reset();
@@ -641,6 +646,14 @@ public void testPutEmptyObject() throws Exception {
     assertEquals(0, bucket.getKey(KEY_NAME).getDataSize());
   }
 
+  @Test
+  public void testPutObjectRejectsIncompleteBody() {
+    assertErrorResponse(INVALID_REQUEST,
+        () -> put(objectEndpoint, BUCKET_NAME, KEY_NAME, 0,
+            null, CONTENT.length() + 1, CONTENT));
+    assertThrows(IOException.class, () -> bucket.getKey(KEY_NAME));
+  }
+
   @Test
   public void testPutObjectWithContentMD5() throws Exception {
     // GIVEN
@@ -804,4 +817,10 @@ private Response putObject(String bucketName, String 
keyName) throws IOException
   private Response putObject(String content) throws IOException, OS3Exception {
     return put(objectEndpoint, BUCKET_NAME, KEY_NAME, content);
   }
+
+  private void setPutRequestLength(long length) {
+    when(objectEndpoint.getContext().getMethod()).thenReturn(HttpMethod.PUT);
+    when(headers.getHeaderString(HttpHeaders.CONTENT_LENGTH))
+        .thenReturn(String.valueOf(length));
+  }
 }
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
index 8f4cf0069d7..49adfa06511 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.ozone.s3.endpoint;
 
+import static 
org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.FailingInputStream;
 import static 
org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.assertErrorResponse;
 import static 
org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.assertSucceeds;
 import static 
org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.initiateMultipartUpload;
@@ -27,10 +28,8 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyLong;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.spy;
@@ -40,16 +39,15 @@
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.Base64;
 import java.util.UUID;
 import java.util.stream.Stream;
+import javax.ws.rs.HttpMethod;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response;
-import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -58,6 +56,7 @@
 import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
 import org.apache.hadoop.ozone.s3.exception.OS3Exception;
 import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
+import org.apache.hadoop.ozone.s3.util.S3Consts;
 import org.apache.hadoop.ozone.s3.util.S3StorageType;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -142,33 +141,27 @@ public void testPartUploadWithStandardIA() throws 
Exception {
     assertContentLength(uploadID, keyName, content.length());
   }
 
-  @Test
-  public void testPartUploadWithStandardIAAndContentMD5() throws Exception {
-    when(headers.getHeaderString(STORAGE_CLASS_HEADER))
-        .thenReturn(S3StorageType.STANDARD_IA.name(), (String)null);
-    String content = "Multipart Upload Part";
-    byte[] contentBytes = content.getBytes(StandardCharsets.UTF_8);
-    byte[] md5Bytes = MessageDigest.getInstance("MD5").digest(contentBytes);
-    String md5Base64 = Base64.getEncoder().encodeToString(md5Bytes);
-    when(headers.getHeaderString("Content-MD5")).thenReturn(md5Base64);
-
-    String keyName = UUID.randomUUID().toString();
-    String uploadID = initiateMultipartUpload(rest, OzoneConsts.S3_BUCKET, 
keyName);
-
-    try (Response response = put(rest, OzoneConsts.S3_BUCKET, keyName, 1,
-        uploadID, content)) {
-      assertNotNull(response.getHeaderString(OzoneConsts.ETAG));
-      assertEquals(200, response.getStatus());
-    }
-    assertContentLength(uploadID, keyName, content.length());
-  }
-
   @Test
   public void testPartUploadWithIncorrectUploadID() {
     assertErrorResponse(S3ErrorTable.NO_SUCH_UPLOAD,
         () -> put(rest, OzoneConsts.S3_BUCKET, OzoneConsts.KEY, 1, "random", 
"any"));
   }
 
+  @Test
+  public void testPartUploadRejectsIncompleteBody() throws Exception {
+    String uploadID = initiateMultipartUpload(rest, OzoneConsts.S3_BUCKET,
+        OzoneConsts.KEY);
+    String content = "Multipart Upload";
+
+    assertErrorResponse(S3ErrorTable.INVALID_REQUEST,
+        () -> put(rest, OzoneConsts.S3_BUCKET, OzoneConsts.KEY,
+            1, uploadID, content.length() + 1, content));
+    OzoneMultipartUploadPartListParts parts =
+        client.getObjectStore().getS3Bucket(OzoneConsts.S3_BUCKET)
+            .listParts(OzoneConsts.KEY, uploadID, 0, 100);
+    assertEquals(0, parts.getPartInfoList().size());
+  }
+
   @Test
   public void testPartUploadStreamContentLength()
       throws IOException, OS3Exception {
@@ -195,34 +188,28 @@ public void 
testPartUploadMessageDigestResetDuringException() throws IOException
     String uploadID = initiateMultipartUpload(rest, OzoneConsts.S3_BUCKET, 
OzoneConsts.KEY);
 
     MessageDigest messageDigest = mock(MessageDigest.class);
-    when(messageDigest.getAlgorithm()).thenReturn("MD5");
+    when(messageDigest.getAlgorithm()).thenReturn(OzoneConsts.MD5_HASH);
     MessageDigest sha256Digest = mock(MessageDigest.class);
-    when(sha256Digest.getAlgorithm()).thenReturn("SHA-256");
-    try (MockedStatic<IOUtils> ioutils = mockStatic(IOUtils.class);
-        MockedStatic<ObjectEndpointStreaming> streaming = 
mockStatic(ObjectEndpointStreaming.class);
-        MockedStatic<EndpointBase> endpoint = mockStatic(EndpointBase.class)) {
+    when(sha256Digest.getAlgorithm()).thenReturn(OzoneConsts.FILE_HASH);
+    try (MockedStatic<EndpointBase> endpoint =
+        mockStatic(EndpointBase.class, CALLS_REAL_METHODS)) {
       // Add the mocked methods only during part upload
       
endpoint.when(EndpointBase::getMD5DigestInstance).thenReturn(messageDigest);
       
endpoint.when(EndpointBase::getSha256DigestInstance).thenReturn(sha256Digest);
-      if (enableDataStream) {
-        streaming.when(() -> ObjectEndpointStreaming.createMultipartKey(any(), 
any(), anyLong(), anyInt(), any(),
-                anyInt(), any(), any(), any()))
-            .thenThrow(IOException.class);
-      } else {
-        ioutils.when(() -> IOUtils.copyLarge(any(InputStream.class), 
any(OutputStream.class), anyLong(),
-                anyLong(), any(byte[].class)))
-            .thenThrow(IOException.class);
-      }
 
       String content = "Multipart Upload";
-      try (Response ignored = put(rest, OzoneConsts.S3_BUCKET, 
OzoneConsts.KEY, 1, uploadID, content)) {
-        fail("Should throw IOException");
-      } catch (IOException ignored) {
-        // Verify that the message digest is reset so that the instance can be 
reused for the
-        // next request in the same thread
-        verify(messageDigest, times(1)).reset();
-        verify(sha256Digest, times(1)).reset();
+      try (InputStream body = new FailingInputStream(
+          content.getBytes(StandardCharsets.UTF_8), 5)) {
+        IOException ex = assertThrows(IOException.class,
+            () -> putPart(rest, OzoneConsts.S3_BUCKET, OzoneConsts.KEY, 1,
+                uploadID, content.length(), body).close());
+        assertEquals("upload interrupted", ex.getMessage());
       }
+
+      // Verify that the message digest is reset so that the instance can be 
reused for the
+      // next request in the same thread
+      verify(messageDigest, times(1)).reset();
+      verify(sha256Digest, times(1)).reset();
     }
   }
 
@@ -293,4 +280,16 @@ private void assertContentLength(String uploadID, String 
key,
     assertEquals(contentLength,
         parts.getPartInfoList().get(0).getSize());
   }
+
+  private static Response putPart(ObjectEndpoint subject, String bucket,
+      String key, int partNumber, String uploadID, long contentLength,
+      InputStream body) throws IOException, OS3Exception {
+    subject.queryParamsForTest().set(S3Consts.QueryParams.UPLOAD_ID, uploadID);
+    subject.queryParamsForTest().setInt(S3Consts.QueryParams.PART_NUMBER,
+        partNumber);
+    when(subject.getContext().getMethod()).thenReturn(HttpMethod.PUT);
+    when(subject.getHeaders().getHeaderString(HttpHeaders.CONTENT_LENGTH))
+        .thenReturn(String.valueOf(contentLength));
+    return subject.put(bucket, key, body);
+  }
 }
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestUploadWithStream.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestUploadWithStream.java
index 66d4a4cbef6..ddbfeb2b866 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestUploadWithStream.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestUploadWithStream.java
@@ -19,17 +19,22 @@
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD;
+import static 
org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.FailingInputStream;
 import static 
org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.assertSucceeds;
 import static org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.put;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.COPY_SOURCE_HEADER;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.X_AMZ_CONTENT_SHA256;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
 import java.io.OutputStream;
+import java.security.MessageDigest;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import javax.ws.rs.core.HttpHeaders;
@@ -39,9 +44,12 @@
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.audit.AuditLogger;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientStub;
+import org.apache.hadoop.ozone.s3.MultiDigestInputStream;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -92,6 +100,28 @@ public void testUpload() throws Exception {
     assertSucceeds(() -> put(rest, S3BUCKET, S3KEY, 
S3_COPY_EXISTING_KEY_CONTENT));
   }
 
+  @Test
+  public void testUploadDoesNotCommitWhenBodyReadFails() throws Exception {
+    OzoneBucket bucket = client.getObjectStore().getS3Bucket(S3BUCKET);
+    byte[] keyContent = S3_COPY_EXISTING_KEY_CONTENT.getBytes(UTF_8);
+    try (FailingInputStream failing = new FailingInputStream(keyContent, 5);
+        MultiDigestInputStream body = new MultiDigestInputStream(failing,
+            Collections.singletonList(
+                MessageDigest.getInstance(OzoneConsts.MD5_HASH)))) {
+
+      IOException ex = assertThrows(IOException.class,
+          () -> ObjectEndpointStreaming.put(bucket, S3KEY, keyContent.length,
+              ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS,
+                  ReplicationFactor.THREE), rest.getChunkSize(),
+              new HashMap<>(), new HashMap<>(), body, rest.getHeaders(), true,
+              new AuditLogger.PerformanceStringBuilder(),
+              S3ConditionalRequest.parseWriteConditions(rest.getHeaders(),
+                  S3KEY)));
+      assertEquals("upload interrupted", ex.getMessage());
+      assertThrows(IOException.class, () -> bucket.getKey(S3KEY));
+    }
+  }
+
   @Test
   public void testUploadWithCopy() throws Exception {
     OzoneBucket bucket =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to