This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new f646b1e9bf2 HDDS-15213. Use common commit output for stream outputs 
(#10224)
f646b1e9bf2 is described below

commit f646b1e9bf2c00264ea812cfe3a486bc48ce2d5a
Author: Peter Lee <[email protected]>
AuthorDate: Sun May 17 21:09:59 2026 +0800

    HDDS-15213. Use common commit output for stream outputs (#10224)
---
 .../hadoop/ozone/client/io/KeyCommitOutput.java    | 35 +++++++++++
 .../ozone/client/io/KeyDataStreamOutput.java       |  4 +-
 .../hadoop/ozone/client/io/KeyOutputStream.java    |  6 +-
 .../ozone/client/io/OzoneDataStreamOutput.java     | 67 ++++++++++++++++------
 .../ozone/s3/endpoint/ObjectEndpointStreaming.java |  7 ++-
 .../hadoop/ozone/client/OzoneBucketStub.java       | 25 +++++++-
 .../hadoop/ozone/s3/endpoint/TestPartUpload.java   | 37 ++++++++++++
 7 files changed, 153 insertions(+), 28 deletions(-)

diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyCommitOutput.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyCommitOutput.java
new file mode 100644
index 00000000000..32a1638f050
--- /dev/null
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyCommitOutput.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.io;
+
+import jakarta.annotation.Nonnull;
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+import org.apache.ratis.util.function.CheckedRunnable;
+
+/**
+ * Common commit-time behavior for key output implementations.
+ */
+interface KeyCommitOutput extends KeyMetadataAware {
+
+  void setPreCommits(
+      @Nonnull List<CheckedRunnable<IOException>> preCommits);
+
+  OmMultipartCommitUploadPartInfo getCommitUploadPartInfo();
+}
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
index ceacd624e93..7556f1e6d76 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
@@ -60,7 +60,7 @@
  * TODO : currently not support multi-thread access.
  */
 public class KeyDataStreamOutput extends AbstractDataStreamOutput
-    implements KeyMetadataAware {
+    implements KeyCommitOutput {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(KeyDataStreamOutput.class);
@@ -87,6 +87,7 @@ public class KeyDataStreamOutput extends 
AbstractDataStreamOutput
 
   private List<CheckedRunnable<IOException>> preCommits = 
Collections.emptyList();
 
+  @Override
   public void setPreCommits(@Nonnull List<CheckedRunnable<IOException>> 
preCommits) {
     this.preCommits = preCommits;
   }
@@ -472,6 +473,7 @@ public void close() throws IOException {
     }
   }
 
+  @Override
   public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
     return blockDataStreamOutputEntryPool.getCommitUploadPartInfo();
   }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 2f9edfa94ea..b42e150ecff 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -76,7 +76,7 @@
  * TODO : currently not support multi-thread access.
  */
 public class KeyOutputStream extends OutputStream
-    implements Syncable, KeyMetadataAware {
+    implements Syncable, KeyCommitOutput {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(KeyOutputStream.class);
@@ -114,6 +114,7 @@ public class KeyOutputStream extends OutputStream
   private final KeyOutputStreamSemaphore keyOutputStreamSemaphore;
   private List<CheckedRunnable<IOException>> preCommits = 
Collections.emptyList();
 
+  @Override
   public void setPreCommits(@Nonnull List<CheckedRunnable<IOException>> 
preCommits) {
     this.preCommits = preCommits;
   }
@@ -671,7 +672,8 @@ private void closeInternal() throws IOException {
     }
   }
 
-  synchronized OmMultipartCommitUploadPartInfo
+  @Override
+  public synchronized OmMultipartCommitUploadPartInfo
       getCommitUploadPartInfo() {
     return blockOutputStreamEntryPool.getCommitUploadPartInfo();
   }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
index 7ce3f71b375..52e79ae5f12 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
@@ -27,6 +28,7 @@
 import org.apache.hadoop.fs.Syncable;
 import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+import org.apache.ratis.util.function.CheckedRunnable;
 
 /**
  * OzoneDataStreamOutput is used to write data into Ozone.
@@ -100,40 +102,67 @@ public synchronized void close() throws IOException {
   }
 
   public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
-    KeyDataStreamOutput keyDataStreamOutput = getKeyDataStreamOutput();
-    if (keyDataStreamOutput != null) {
-      return keyDataStreamOutput.getCommitUploadPartInfo();
+    KeyCommitOutput keyCommitOutput = getKeyCommitOutput();
+    if (keyCommitOutput != null) {
+      return keyCommitOutput.getCommitUploadPartInfo();
     }
     // Otherwise return null.
     return null;
   }
 
   public KeyDataStreamOutput getKeyDataStreamOutput() {
+    if (byteBufferStreamOutput instanceof KeyDataStreamOutput) {
+      return ((KeyDataStreamOutput) byteBufferStreamOutput);
+    }
     if (byteBufferStreamOutput instanceof OzoneOutputStream) {
       OutputStream outputStream =
           ((OzoneOutputStream) byteBufferStreamOutput).getOutputStream();
-      if (outputStream instanceof KeyDataStreamOutput) {
-        return ((KeyDataStreamOutput) outputStream);
-      } else if (outputStream instanceof CryptoOutputStream) {
-        OutputStream wrappedStream =
-            ((CryptoOutputStream) outputStream).getWrappedStream();
-        if (wrappedStream instanceof KeyDataStreamOutput) {
-          return ((KeyDataStreamOutput) wrappedStream);
-        }
-      } else if (outputStream instanceof CipherOutputStreamOzone) {
-        OutputStream wrappedStream =
-            ((CipherOutputStreamOzone) outputStream).getWrappedStream();
-        if (wrappedStream instanceof KeyDataStreamOutput) {
-          return ((KeyDataStreamOutput) wrappedStream);
-        }
+      OutputStream unwrappedStream = unwrap(outputStream);
+      if (unwrappedStream instanceof KeyDataStreamOutput) {
+        return ((KeyDataStreamOutput) unwrappedStream);
+      }
+    }
+    // Otherwise return null.
+    return null;
+  }
+
+  private KeyCommitOutput getKeyCommitOutput() {
+    if (byteBufferStreamOutput instanceof KeyCommitOutput) {
+      return (KeyCommitOutput) byteBufferStreamOutput;
+    }
+    if (byteBufferStreamOutput instanceof OzoneOutputStream) {
+      OutputStream outputStream =
+          ((OzoneOutputStream) byteBufferStreamOutput).getOutputStream();
+      OutputStream unwrappedStream = unwrap(outputStream);
+      if (unwrappedStream instanceof KeyCommitOutput) {
+        return (KeyCommitOutput) unwrappedStream;
       }
-    } else if (byteBufferStreamOutput instanceof KeyDataStreamOutput) {
-      return ((KeyDataStreamOutput) byteBufferStreamOutput);
     }
     // Otherwise return null.
     return null;
   }
 
+  public void setPreCommits(
+      List<CheckedRunnable<IOException>> preCommits) {
+    KeyCommitOutput keyCommitOutput = getKeyCommitOutput();
+    if (keyCommitOutput != null) {
+      keyCommitOutput.setPreCommits(preCommits);
+      return;
+    }
+    throw new IllegalStateException(
+        "Output stream is not backed by KeyCommitOutput: " +
+            byteBufferStreamOutput.getClass());
+  }
+
+  private static OutputStream unwrap(OutputStream outputStream) {
+    if (outputStream instanceof CryptoOutputStream) {
+      return ((CryptoOutputStream) outputStream).getWrappedStream();
+    } else if (outputStream instanceof CipherOutputStreamOzone) {
+      return ((CipherOutputStreamOzone) outputStream).getWrappedStream();
+    }
+    return outputStream;
+  }
+
   @Override
   public void hflush() throws IOException {
     hsync();
diff --git 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
index 92b5d40b963..fa1cde66049 100644
--- 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
@@ -29,7 +29,6 @@
 import java.security.DigestInputStream;
 import java.security.MessageDigest;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import javax.ws.rs.core.HttpHeaders;
@@ -154,7 +153,7 @@ public static Pair<String, Long> putKeyWithStream(
         preCommits.add(checkSha256Hook);
       }
 
-      streamOutput.getKeyDataStreamOutput().setPreCommits(preCommits);
+      streamOutput.setPreCommits(preCommits);
     }
     return Pair.of(md5Hash, writeLen);
   }
@@ -237,13 +236,15 @@ public static Response createMultipartKey(OzoneBucket 
ozoneBucket, String key,
             writeToStreamOutput(streamOutput, body, chunkSize, length);
         eTag = DatatypeConverter.printHexBinary(
             
body.getMessageDigest(OzoneConsts.MD5_HASH).digest()).toLowerCase();
+        List<CheckedRunnable<IOException>> preCommits = new ArrayList<>();
         String clientContentMD5 = 
headers.getHeaderString(S3Consts.CHECKSUM_HEADER);
         if (clientContentMD5 != null) {
           CheckedRunnable<IOException> checkContentMD5Hook = () -> {
             S3Utils.validateContentMD5(clientContentMD5, eTag, key);
           };
-          
streamOutput.getKeyDataStreamOutput().setPreCommits(Collections.singletonList(checkContentMD5Hook));
+          preCommits.add(checkContentMD5Hook);
         }
+        streamOutput.setPreCommits(preCommits);
         ((KeyMetadataAware)streamOutput).getMetadata().put(OzoneConsts.ETAG, 
eTag);
         METRICS.incPutKeySuccessLength(putLength);
         perf.appendMetaLatencyNanos(metadataLatencyNs);
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
index 8a8864c398b..3b12b72540f 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
@@ -326,6 +326,12 @@ public OzoneDataStreamOutput 
createMultipartStreamKey(String key,
     if (multipartInfo == null || 
!multipartInfo.getUploadId().equals(uploadID)) {
       throw new OMException(ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
     } else {
+      if (isECMultipartUpload(multipartInfo)) {
+        OzoneOutputStream outputStream =
+            createMultipartKey(key, size, partNumber, uploadID);
+        return new OzoneDataStreamOutputStub(outputStream, key + size);
+      }
+
       ByteBufferStreamOutput byteBufferStreamOutput =
           new KeyMetadataAwareByteBufferStreamOutput(new HashMap<>()) {
             private final ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
@@ -364,6 +370,12 @@ public void write(ByteBuffer b, int off, int len)
     }
   }
 
+  private boolean isECMultipartUpload(MultipartInfoStub multipartInfo) {
+    ReplicationConfig config = multipartInfo.getReplicationConfig();
+    return config != null &&
+        config.getReplicationType() == HddsProtos.ReplicationType.EC;
+  }
+
   @Override
   public OzoneInputStream readKey(String key) throws IOException {
     return new OzoneInputStream(new 
ByteArrayInputStream(keyContents.get(key)));
@@ -502,7 +514,8 @@ public OmMultipartInfo initiateMultipartUpload(String 
keyName,
        ReplicationConfig config, Map<String, String> metadata, Map<String, 
String> tags)
       throws IOException {
     String uploadID = UUID.randomUUID().toString();
-    keyToMultipartUpload.put(keyName, new MultipartInfoStub(uploadID, 
metadata, tags));
+    keyToMultipartUpload.put(keyName,
+        new MultipartInfoStub(uploadID, config, metadata, tags));
     return new OmMultipartInfo(getVolumeName(), getName(), keyName, uploadID);
   }
 
@@ -911,12 +924,14 @@ public Map<String, String> getMetadata() {
   private static class MultipartInfoStub {
 
     private final String uploadId;
+    private final ReplicationConfig replicationConfig;
     private final Map<String, String> metadata;
     private final Map<String, String> tags;
 
-    MultipartInfoStub(String uploadId, Map<String, String> metadata,
-                      Map<String, String> tags) {
+    MultipartInfoStub(String uploadId, ReplicationConfig replicationConfig,
+                      Map<String, String> metadata, Map<String, String> tags) {
       this.uploadId = uploadId;
+      this.replicationConfig = replicationConfig;
       this.metadata = metadata;
       this.tags = tags;
     }
@@ -925,6 +940,10 @@ public String getUploadId() {
       return uploadId;
     }
 
+    public ReplicationConfig getReplicationConfig() {
+      return replicationConfig;
+    }
+
     public Map<String, String> getMetadata() {
       return metadata;
     }
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
index fa9562597a8..8f4cf0069d7 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
@@ -58,6 +58,7 @@
 import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
 import org.apache.hadoop.ozone.s3.exception.OS3Exception;
 import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
+import org.apache.hadoop.ozone.s3.util.S3StorageType;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.Parameter;
@@ -126,6 +127,42 @@ public void testPartUpload() throws Exception {
     }
   }
 
+  @Test
+  public void testPartUploadWithStandardIA() throws Exception {
+    when(headers.getHeaderString(STORAGE_CLASS_HEADER))
+        .thenReturn(S3StorageType.STANDARD_IA.name(), (String)null);
+    String keyName = UUID.randomUUID().toString();
+    String uploadID = initiateMultipartUpload(rest, OzoneConsts.S3_BUCKET, 
keyName);
+
+    String content = "Multipart Upload";
+    try (Response response = put(rest, OzoneConsts.S3_BUCKET, keyName, 1, 
uploadID, content)) {
+      assertNotNull(response.getHeaderString(OzoneConsts.ETAG));
+      assertEquals(200, response.getStatus());
+    }
+    assertContentLength(uploadID, keyName, content.length());
+  }
+
+  @Test
+  public void testPartUploadWithStandardIAAndContentMD5() throws Exception {
+    when(headers.getHeaderString(STORAGE_CLASS_HEADER))
+        .thenReturn(S3StorageType.STANDARD_IA.name(), (String)null);
+    String content = "Multipart Upload Part";
+    byte[] contentBytes = content.getBytes(StandardCharsets.UTF_8);
+    byte[] md5Bytes = MessageDigest.getInstance("MD5").digest(contentBytes);
+    String md5Base64 = Base64.getEncoder().encodeToString(md5Bytes);
+    when(headers.getHeaderString("Content-MD5")).thenReturn(md5Base64);
+
+    String keyName = UUID.randomUUID().toString();
+    String uploadID = initiateMultipartUpload(rest, OzoneConsts.S3_BUCKET, 
keyName);
+
+    try (Response response = put(rest, OzoneConsts.S3_BUCKET, keyName, 1,
+        uploadID, content)) {
+      assertNotNull(response.getHeaderString(OzoneConsts.ETAG));
+      assertEquals(200, response.getStatus());
+    }
+    assertContentLength(uploadID, keyName, content.length());
+  }
+
   @Test
   public void testPartUploadWithIncorrectUploadID() {
     assertErrorResponse(S3ErrorTable.NO_SUCH_UPLOAD,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to