This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new f646b1e9bf2 HDDS-15213. Use common commit output for stream outputs
(#10224)
f646b1e9bf2 is described below
commit f646b1e9bf2c00264ea812cfe3a486bc48ce2d5a
Author: Peter Lee <[email protected]>
AuthorDate: Sun May 17 21:09:59 2026 +0800
HDDS-15213. Use common commit output for stream outputs (#10224)
---
.../hadoop/ozone/client/io/KeyCommitOutput.java | 35 +++++++++++
.../ozone/client/io/KeyDataStreamOutput.java | 4 +-
.../hadoop/ozone/client/io/KeyOutputStream.java | 6 +-
.../ozone/client/io/OzoneDataStreamOutput.java | 67 ++++++++++++++++------
.../ozone/s3/endpoint/ObjectEndpointStreaming.java | 7 ++-
.../hadoop/ozone/client/OzoneBucketStub.java | 25 +++++++-
.../hadoop/ozone/s3/endpoint/TestPartUpload.java | 37 ++++++++++++
7 files changed, 153 insertions(+), 28 deletions(-)
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyCommitOutput.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyCommitOutput.java
new file mode 100644
index 00000000000..32a1638f050
--- /dev/null
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyCommitOutput.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.io;
+
+import jakarta.annotation.Nonnull;
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+import org.apache.ratis.util.function.CheckedRunnable;
+
+/**
+ * Common commit-time behavior for key output implementations.
+ */
+interface KeyCommitOutput extends KeyMetadataAware {
+
+ void setPreCommits(
+ @Nonnull List<CheckedRunnable<IOException>> preCommits);
+
+ OmMultipartCommitUploadPartInfo getCommitUploadPartInfo();
+}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
index ceacd624e93..7556f1e6d76 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
@@ -60,7 +60,7 @@
* TODO : currently not support multi-thread access.
*/
public class KeyDataStreamOutput extends AbstractDataStreamOutput
- implements KeyMetadataAware {
+ implements KeyCommitOutput {
private static final Logger LOG =
LoggerFactory.getLogger(KeyDataStreamOutput.class);
@@ -87,6 +87,7 @@ public class KeyDataStreamOutput extends
AbstractDataStreamOutput
private List<CheckedRunnable<IOException>> preCommits =
Collections.emptyList();
+ @Override
public void setPreCommits(@Nonnull List<CheckedRunnable<IOException>>
preCommits) {
this.preCommits = preCommits;
}
@@ -472,6 +473,7 @@ public void close() throws IOException {
}
}
+ @Override
public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
return blockDataStreamOutputEntryPool.getCommitUploadPartInfo();
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 2f9edfa94ea..b42e150ecff 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -76,7 +76,7 @@
* TODO : currently not support multi-thread access.
*/
public class KeyOutputStream extends OutputStream
- implements Syncable, KeyMetadataAware {
+ implements Syncable, KeyCommitOutput {
private static final Logger LOG =
LoggerFactory.getLogger(KeyOutputStream.class);
@@ -114,6 +114,7 @@ public class KeyOutputStream extends OutputStream
private final KeyOutputStreamSemaphore keyOutputStreamSemaphore;
private List<CheckedRunnable<IOException>> preCommits =
Collections.emptyList();
+ @Override
public void setPreCommits(@Nonnull List<CheckedRunnable<IOException>>
preCommits) {
this.preCommits = preCommits;
}
@@ -671,7 +672,8 @@ private void closeInternal() throws IOException {
}
}
- synchronized OmMultipartCommitUploadPartInfo
+ @Override
+ public synchronized OmMultipartCommitUploadPartInfo
getCommitUploadPartInfo() {
return blockOutputStreamEntryPool.getCommitUploadPartInfo();
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
index 7ce3f71b375..52e79ae5f12 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@@ -27,6 +28,7 @@
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+import org.apache.ratis.util.function.CheckedRunnable;
/**
* OzoneDataStreamOutput is used to write data into Ozone.
@@ -100,40 +102,67 @@ public synchronized void close() throws IOException {
}
public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
- KeyDataStreamOutput keyDataStreamOutput = getKeyDataStreamOutput();
- if (keyDataStreamOutput != null) {
- return keyDataStreamOutput.getCommitUploadPartInfo();
+ KeyCommitOutput keyCommitOutput = getKeyCommitOutput();
+ if (keyCommitOutput != null) {
+ return keyCommitOutput.getCommitUploadPartInfo();
}
// Otherwise return null.
return null;
}
public KeyDataStreamOutput getKeyDataStreamOutput() {
+ if (byteBufferStreamOutput instanceof KeyDataStreamOutput) {
+ return ((KeyDataStreamOutput) byteBufferStreamOutput);
+ }
if (byteBufferStreamOutput instanceof OzoneOutputStream) {
OutputStream outputStream =
((OzoneOutputStream) byteBufferStreamOutput).getOutputStream();
- if (outputStream instanceof KeyDataStreamOutput) {
- return ((KeyDataStreamOutput) outputStream);
- } else if (outputStream instanceof CryptoOutputStream) {
- OutputStream wrappedStream =
- ((CryptoOutputStream) outputStream).getWrappedStream();
- if (wrappedStream instanceof KeyDataStreamOutput) {
- return ((KeyDataStreamOutput) wrappedStream);
- }
- } else if (outputStream instanceof CipherOutputStreamOzone) {
- OutputStream wrappedStream =
- ((CipherOutputStreamOzone) outputStream).getWrappedStream();
- if (wrappedStream instanceof KeyDataStreamOutput) {
- return ((KeyDataStreamOutput) wrappedStream);
- }
+ OutputStream unwrappedStream = unwrap(outputStream);
+ if (unwrappedStream instanceof KeyDataStreamOutput) {
+ return ((KeyDataStreamOutput) unwrappedStream);
+ }
+ }
+ // Otherwise return null.
+ return null;
+ }
+
+ private KeyCommitOutput getKeyCommitOutput() {
+ if (byteBufferStreamOutput instanceof KeyCommitOutput) {
+ return (KeyCommitOutput) byteBufferStreamOutput;
+ }
+ if (byteBufferStreamOutput instanceof OzoneOutputStream) {
+ OutputStream outputStream =
+ ((OzoneOutputStream) byteBufferStreamOutput).getOutputStream();
+ OutputStream unwrappedStream = unwrap(outputStream);
+ if (unwrappedStream instanceof KeyCommitOutput) {
+ return (KeyCommitOutput) unwrappedStream;
}
- } else if (byteBufferStreamOutput instanceof KeyDataStreamOutput) {
- return ((KeyDataStreamOutput) byteBufferStreamOutput);
}
// Otherwise return null.
return null;
}
+ public void setPreCommits(
+ List<CheckedRunnable<IOException>> preCommits) {
+ KeyCommitOutput keyCommitOutput = getKeyCommitOutput();
+ if (keyCommitOutput != null) {
+ keyCommitOutput.setPreCommits(preCommits);
+ return;
+ }
+ throw new IllegalStateException(
+ "Output stream is not backed by KeyCommitOutput: " +
+ byteBufferStreamOutput.getClass());
+ }
+
+ private static OutputStream unwrap(OutputStream outputStream) {
+ if (outputStream instanceof CryptoOutputStream) {
+ return ((CryptoOutputStream) outputStream).getWrappedStream();
+ } else if (outputStream instanceof CipherOutputStreamOzone) {
+ return ((CipherOutputStreamOzone) outputStream).getWrappedStream();
+ }
+ return outputStream;
+ }
+
@Override
public void hflush() throws IOException {
hsync();
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
index 92b5d40b963..fa1cde66049 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
@@ -29,7 +29,6 @@
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.ws.rs.core.HttpHeaders;
@@ -154,7 +153,7 @@ public static Pair<String, Long> putKeyWithStream(
preCommits.add(checkSha256Hook);
}
- streamOutput.getKeyDataStreamOutput().setPreCommits(preCommits);
+ streamOutput.setPreCommits(preCommits);
}
return Pair.of(md5Hash, writeLen);
}
@@ -237,13 +236,15 @@ public static Response createMultipartKey(OzoneBucket
ozoneBucket, String key,
writeToStreamOutput(streamOutput, body, chunkSize, length);
eTag = DatatypeConverter.printHexBinary(
body.getMessageDigest(OzoneConsts.MD5_HASH).digest()).toLowerCase();
+ List<CheckedRunnable<IOException>> preCommits = new ArrayList<>();
String clientContentMD5 =
headers.getHeaderString(S3Consts.CHECKSUM_HEADER);
if (clientContentMD5 != null) {
CheckedRunnable<IOException> checkContentMD5Hook = () -> {
S3Utils.validateContentMD5(clientContentMD5, eTag, key);
};
-
streamOutput.getKeyDataStreamOutput().setPreCommits(Collections.singletonList(checkContentMD5Hook));
+ preCommits.add(checkContentMD5Hook);
}
+ streamOutput.setPreCommits(preCommits);
((KeyMetadataAware)streamOutput).getMetadata().put(OzoneConsts.ETAG,
eTag);
METRICS.incPutKeySuccessLength(putLength);
perf.appendMetaLatencyNanos(metadataLatencyNs);
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
index 8a8864c398b..3b12b72540f 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
@@ -326,6 +326,12 @@ public OzoneDataStreamOutput
createMultipartStreamKey(String key,
if (multipartInfo == null ||
!multipartInfo.getUploadId().equals(uploadID)) {
throw new OMException(ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
} else {
+ if (isECMultipartUpload(multipartInfo)) {
+ OzoneOutputStream outputStream =
+ createMultipartKey(key, size, partNumber, uploadID);
+ return new OzoneDataStreamOutputStub(outputStream, key + size);
+ }
+
ByteBufferStreamOutput byteBufferStreamOutput =
new KeyMetadataAwareByteBufferStreamOutput(new HashMap<>()) {
private final ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
@@ -364,6 +370,12 @@ public void write(ByteBuffer b, int off, int len)
}
}
+ private boolean isECMultipartUpload(MultipartInfoStub multipartInfo) {
+ ReplicationConfig config = multipartInfo.getReplicationConfig();
+ return config != null &&
+ config.getReplicationType() == HddsProtos.ReplicationType.EC;
+ }
+
@Override
public OzoneInputStream readKey(String key) throws IOException {
return new OzoneInputStream(new
ByteArrayInputStream(keyContents.get(key)));
@@ -502,7 +514,8 @@ public OmMultipartInfo initiateMultipartUpload(String
keyName,
ReplicationConfig config, Map<String, String> metadata, Map<String,
String> tags)
throws IOException {
String uploadID = UUID.randomUUID().toString();
- keyToMultipartUpload.put(keyName, new MultipartInfoStub(uploadID,
metadata, tags));
+ keyToMultipartUpload.put(keyName,
+ new MultipartInfoStub(uploadID, config, metadata, tags));
return new OmMultipartInfo(getVolumeName(), getName(), keyName, uploadID);
}
@@ -911,12 +924,14 @@ public Map<String, String> getMetadata() {
private static class MultipartInfoStub {
private final String uploadId;
+ private final ReplicationConfig replicationConfig;
private final Map<String, String> metadata;
private final Map<String, String> tags;
- MultipartInfoStub(String uploadId, Map<String, String> metadata,
- Map<String, String> tags) {
+ MultipartInfoStub(String uploadId, ReplicationConfig replicationConfig,
+ Map<String, String> metadata, Map<String, String> tags) {
this.uploadId = uploadId;
+ this.replicationConfig = replicationConfig;
this.metadata = metadata;
this.tags = tags;
}
@@ -925,6 +940,10 @@ public String getUploadId() {
return uploadId;
}
+ public ReplicationConfig getReplicationConfig() {
+ return replicationConfig;
+ }
+
public Map<String, String> getMetadata() {
return metadata;
}
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
index fa9562597a8..8f4cf0069d7 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
@@ -58,6 +58,7 @@
import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
+import org.apache.hadoop.ozone.s3.util.S3StorageType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.Parameter;
@@ -126,6 +127,42 @@ public void testPartUpload() throws Exception {
}
}
+ @Test
+ public void testPartUploadWithStandardIA() throws Exception {
+ when(headers.getHeaderString(STORAGE_CLASS_HEADER))
+ .thenReturn(S3StorageType.STANDARD_IA.name(), (String)null);
+ String keyName = UUID.randomUUID().toString();
+ String uploadID = initiateMultipartUpload(rest, OzoneConsts.S3_BUCKET,
keyName);
+
+ String content = "Multipart Upload";
+ try (Response response = put(rest, OzoneConsts.S3_BUCKET, keyName, 1,
uploadID, content)) {
+ assertNotNull(response.getHeaderString(OzoneConsts.ETAG));
+ assertEquals(200, response.getStatus());
+ }
+ assertContentLength(uploadID, keyName, content.length());
+ }
+
+ @Test
+ public void testPartUploadWithStandardIAAndContentMD5() throws Exception {
+ when(headers.getHeaderString(STORAGE_CLASS_HEADER))
+ .thenReturn(S3StorageType.STANDARD_IA.name(), (String)null);
+ String content = "Multipart Upload Part";
+ byte[] contentBytes = content.getBytes(StandardCharsets.UTF_8);
+ byte[] md5Bytes = MessageDigest.getInstance("MD5").digest(contentBytes);
+ String md5Base64 = Base64.getEncoder().encodeToString(md5Bytes);
+ when(headers.getHeaderString("Content-MD5")).thenReturn(md5Base64);
+
+ String keyName = UUID.randomUUID().toString();
+ String uploadID = initiateMultipartUpload(rest, OzoneConsts.S3_BUCKET,
keyName);
+
+ try (Response response = put(rest, OzoneConsts.S3_BUCKET, keyName, 1,
+ uploadID, content)) {
+ assertNotNull(response.getHeaderString(OzoneConsts.ETAG));
+ assertEquals(200, response.getStatus());
+ }
+ assertContentLength(uploadID, keyName, content.length());
+ }
+
@Test
public void testPartUploadWithIncorrectUploadID() {
assertErrorResponse(S3ErrorTable.NO_SUCH_UPLOAD,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]