This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 7ce4e44cf0 HDDS-8965. Stream multi part upload support on gateway
(#5026)
7ce4e44cf0 is described below
commit 7ce4e44cf030b6c4bc976433edcea472690f450b
Author: hao guo <[email protected]>
AuthorDate: Fri Jul 14 19:02:52 2023 +0800
HDDS-8965. Stream multi part upload support on gateway (#5026)
---
.../ozone/client/io/OzoneDataStreamOutput.java | 3 +
.../hadoop/ozone/s3/endpoint/ObjectEndpoint.java | 29 ++++-
.../ozone/s3/endpoint/ObjectEndpointStreaming.java | 39 ++++++
.../hadoop/ozone/client/OzoneBucketStub.java | 95 ++++++++++++++
.../ozone/client/OzoneDataStreamOutputStub.java | 70 ++++++++++
.../s3/endpoint/TestPartUploadWithStream.java | 145 +++++++++++++++++++++
6 files changed, 379 insertions(+), 2 deletions(-)
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
index a1e4731a56..ee3f98ae78 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
@@ -71,6 +71,9 @@ public class OzoneDataStreamOutput extends
ByteBufferOutputStream {
.getCommitUploadPartInfo();
}
}
+ } else if (byteBufferStreamOutput instanceof KeyDataStreamOutput) {
+ return ((KeyDataStreamOutput)
+ byteBufferStreamOutput).getCommitUploadPartInfo();
}
// Otherwise return null.
return null;
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index be0eaacffa..ad2211cd37 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -817,10 +817,29 @@ public class ObjectEndpoint extends EndpointBase {
body = new SignedChunksInputStream(body);
}
+ copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER);
+ String storageType = headers.getHeaderString(STORAGE_CLASS_HEADER);
+ final OzoneBucket ozoneBucket = volume.getBucket(bucket);
+ ReplicationConfig replicationConfig =
+ getReplicationConfig(ozoneBucket, storageType);
+
+ boolean enableEC = false;
+ if ((replicationConfig != null &&
+ replicationConfig.getReplicationType() == EC) ||
+ ozoneBucket.getReplicationConfig() instanceof ECReplicationConfig) {
+ enableEC = true;
+ }
+
try {
+ if (datastreamEnabled && !enableEC && copyHeader == null) {
+ getMetrics().updatePutKeyMetadataStats(startNanos);
+ return ObjectEndpointStreaming
+ .createMultipartKey(ozoneBucket, key, length, partNumber,
+ uploadID, chunkSize, body);
+ }
ozoneOutputStream = getClientProtocol().createMultipartKey(
volume.getName(), bucket, key, length, partNumber, uploadID);
- copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER);
+
if (copyHeader != null) {
Pair<String, String> result = parseSourceHeader(copyHeader);
@@ -1122,7 +1141,8 @@ public class ObjectEndpoint extends EndpointBase {
}
}
- static boolean checkCopySourceModificationTime(Long lastModificationTime,
+ public static boolean checkCopySourceModificationTime(
+ Long lastModificationTime,
String copySourceIfModifiedSinceStr,
String copySourceIfUnmodifiedSinceStr) {
long copySourceIfModifiedSince = Long.MIN_VALUE;
@@ -1147,4 +1167,9 @@ public class ObjectEndpoint extends EndpointBase {
public void setOzoneConfiguration(OzoneConfiguration config) {
this.ozoneConfiguration = config;
}
+
+ @VisibleForTesting
+ public boolean isDatastreamEnabled() {
+ return datastreamEnabled;
+ }
}
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
index cfe2984467..5175bf8450 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
@@ -21,11 +21,14 @@ import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
+import org.apache.hadoop.ozone.s3.metrics.S3GatewayMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
@@ -33,6 +36,7 @@ import java.util.Map;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS;
import static
org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_REQUEST;
+import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.NO_SUCH_UPLOAD;
/**
* Key level rest endpoints for Streaming.
@@ -109,4 +113,39 @@ final class ObjectEndpointStreaming {
}
return n;
}
+
+ public static Response createMultipartKey(OzoneBucket ozoneBucket, String
key,
+ long length, int partNumber,
+ String uploadID, int chunkSize,
+ InputStream body)
+ throws IOException, OS3Exception {
+ OzoneDataStreamOutput streamOutput = null;
+ String eTag = "";
+ S3GatewayMetrics metrics = S3GatewayMetrics.create();
+ try {
+ streamOutput = ozoneBucket
+ .createMultipartStreamKey(key, length, partNumber, uploadID);
+ long putLength =
+ writeToStreamOutput(streamOutput, body, chunkSize, length);
+ metrics.incPutKeySuccessLength(putLength);
+ } catch (OMException ex) {
+ if (ex.getResult() ==
+ OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR) {
+ throw S3ErrorTable.newError(NO_SUCH_UPLOAD,
+ uploadID);
+ } else if (ex.getResult() == OMException.ResultCodes.PERMISSION_DENIED) {
+ throw S3ErrorTable.newError(S3ErrorTable.ACCESS_DENIED,
+ ozoneBucket.getName() + "/" + key);
+ }
+ throw ex;
+ } finally {
+ if (streamOutput != null) {
+ streamOutput.close();
+ OmMultipartCommitUploadPartInfo commitUploadPartInfo =
+ streamOutput.getCommitUploadPartInfo();
+ eTag = commitUploadPartInfo.getPartName();
+ }
+ }
+ return Response.ok().header("ETag", eTag).build();
+ }
}
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
index 34c9a46096..d613297fc0 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.ozone.client;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -39,7 +40,9 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import
org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts.PartInfo;
@@ -191,6 +194,98 @@ public class OzoneBucketStub extends OzoneBucket {
return new OzoneOutputStream(byteArrayOutputStream, null);
}
+ @Override
+ public OzoneDataStreamOutput createStreamKey(String key, long size,
+ ReplicationConfig rConfig,
+ Map<String, String> keyMetadata)
+ throws IOException {
+ ByteBufferStreamOutput byteBufferStreamOutput =
+ new ByteBufferStreamOutput() {
+
+ private final ByteBuffer buffer = ByteBuffer.allocate((int) size);
+
+ @Override
+ public void close() throws IOException {
+ buffer.flip();
+ byte[] bytes1 = new byte[buffer.remaining()];
+ buffer.get(bytes1);
+ keyContents.put(key, bytes1);
+ keyDetails.put(key, new OzoneKeyDetails(
+ getVolumeName(),
+ getName(),
+ key,
+ size,
+ System.currentTimeMillis(),
+ System.currentTimeMillis(),
+ new ArrayList<>(), rConfig, metadata, null,
+ null, false
+ ));
+ }
+
+ @Override
+ public void write(ByteBuffer b, int off, int len)
+ throws IOException {
+ byte[] bytes = new byte[len];
+ b.get(bytes, off, len);
+ buffer.put(bytes);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ }
+ };
+
+ return new OzoneDataStreamOutputStub(byteBufferStreamOutput, key + size);
+ }
+
+ @Override
+ public OzoneDataStreamOutput createMultipartStreamKey(String key,
+ long size,
+ int partNumber,
+ String uploadID)
+ throws IOException {
+ String multipartUploadID = multipartUploadIdMap.get(key);
+ if (multipartUploadID == null || !multipartUploadID.equals(uploadID)) {
+ throw new OMException(ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
+ } else {
+ ByteBufferStreamOutput byteBufferStreamOutput =
+ new ByteBufferStreamOutput() {
+ private final ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
+
+ @Override
+ public void close() throws IOException {
+ int position = buffer.position();
+ buffer.flip();
+ byte[] bytes = new byte[position];
+ buffer.get(bytes);
+
+ Part part = new Part(key + size, bytes);
+ if (partList.get(key) == null) {
+ Map<Integer, Part> parts = new TreeMap<>();
+ parts.put(partNumber, part);
+ partList.put(key, parts);
+ } else {
+ partList.get(key).put(partNumber, part);
+ }
+ }
+
+ @Override
+ public void write(ByteBuffer b, int off, int len)
+ throws IOException {
+ byte[] bytes = new byte[len];
+ b.get(bytes, off, len);
+ buffer.put(bytes);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ }
+ };
+
+ return new OzoneDataStreamOutputStub(byteBufferStreamOutput, key + size);
+ }
+ }
+
@Override
public OzoneInputStream readKey(String key) throws IOException {
return new OzoneInputStream(new
ByteArrayInputStream(keyContents.get(key)));
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneDataStreamOutputStub.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneDataStreamOutputStub.java
new file mode 100644
index 0000000000..7bb35682d8
--- /dev/null
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneDataStreamOutputStub.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.client;
+
+import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * OzoneDataStreamOutput stub for testing.
+ */
+public class OzoneDataStreamOutputStub extends OzoneDataStreamOutput {
+
+ private final String partName;
+ private boolean closed = false;
+
+ /**
+ * Constructs OzoneDataStreamOutputStub with streamOutput and partName.
+ */
+ public OzoneDataStreamOutputStub(
+ ByteBufferStreamOutput byteBufferStreamOutput,
+ String partName) {
+ super(byteBufferStreamOutput);
+ this.partName = partName;
+ }
+
+ @Override
+ public void write(ByteBuffer b, int off, int len) throws IOException {
+ getByteBufStreamOutput().write(b, off, len);
+ }
+
+ @Override
+ public synchronized void flush() throws IOException {
+ getByteBufStreamOutput().flush();
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if (!closed) {
+ getByteBufStreamOutput().close();
+ closed = true;
+ }
+ }
+
+ @Override
+ public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
+ return closed ? new OmMultipartCommitUploadPartInfo(partName) : null;
+ }
+}
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUploadWithStream.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUploadWithStream.java
new file mode 100644
index 0000000000..17ae963e11
--- /dev/null
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUploadWithStream.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.s3.endpoint;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientStub;
+import org.apache.hadoop.ozone.s3.exception.OS3Exception;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+import java.io.ByteArrayInputStream;
+
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
+
+/**
+ * This class tests Upload part request.
+ */
+public class TestPartUploadWithStream {
+
+ private static final ObjectEndpoint REST = new ObjectEndpoint();
+
+ private static final String S3BUCKET = "streampartb1";
+ private static final String S3KEY = "testkey";
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ OzoneClient client = new OzoneClientStub();
+ client.getObjectStore().createS3Bucket(S3BUCKET);
+
+
+ HttpHeaders headers = Mockito.mock(HttpHeaders.class);
+ when(headers.getHeaderString(STORAGE_CLASS_HEADER)).thenReturn("STANDARD");
+
+ REST.setHeaders(headers);
+ REST.setClient(client);
+
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLED,
+ true);
+ REST.setOzoneConfiguration(conf);
+ REST.init();
+ }
+
+ @Test
+ public void testEnableStream() {
+ assertTrue(REST.isDatastreamEnabled());
+ }
+
+ @Test
+ public void testPartUpload() throws Exception {
+
+ Response response = REST.initializeMultipartUpload(S3BUCKET, S3KEY);
+ MultipartUploadInitiateResponse multipartUploadInitiateResponse =
+ (MultipartUploadInitiateResponse) response.getEntity();
+ assertNotNull(multipartUploadInitiateResponse.getUploadID());
+ String uploadID = multipartUploadInitiateResponse.getUploadID();
+
+ assertEquals(200, response.getStatus());
+
+ String content = "Multipart Upload";
+ ByteArrayInputStream body =
+ new ByteArrayInputStream(content.getBytes(UTF_8));
+ response = REST.put(S3BUCKET, S3KEY,
+ content.length(), 1, uploadID, body);
+
+ assertNotNull(response.getHeaderString("ETag"));
+
+ }
+
+ @Test
+ public void testPartUploadWithOverride() throws Exception {
+
+ Response response = REST.initializeMultipartUpload(S3BUCKET, S3KEY);
+ MultipartUploadInitiateResponse multipartUploadInitiateResponse =
+ (MultipartUploadInitiateResponse) response.getEntity();
+ assertNotNull(multipartUploadInitiateResponse.getUploadID());
+ String uploadID = multipartUploadInitiateResponse.getUploadID();
+
+ assertEquals(200, response.getStatus());
+
+ String content = "Multipart Upload";
+ ByteArrayInputStream body =
+ new ByteArrayInputStream(content.getBytes(UTF_8));
+ response = REST.put(S3BUCKET, S3KEY,
+ content.length(), 1, uploadID, body);
+
+ assertNotNull(response.getHeaderString("ETag"));
+
+ String eTag = response.getHeaderString("ETag");
+
+ // Upload part again with same part Number, the ETag should be changed.
+ content = "Multipart Upload Changed";
+ response = REST.put(S3BUCKET, S3KEY,
+ content.length(), 1, uploadID, body);
+ assertNotNull(response.getHeaderString("ETag"));
+ assertNotEquals(eTag, response.getHeaderString("ETag"));
+
+ }
+
+ @Test
+ public void testPartUploadWithIncorrectUploadID() throws Exception {
+ try {
+ String content = "Multipart Upload With Incorrect uploadID";
+ ByteArrayInputStream body =
+ new ByteArrayInputStream(content.getBytes(UTF_8));
+ REST.put(S3BUCKET, S3KEY, content.length(), 1,
+ "random", body);
+ fail("testPartUploadWithIncorrectUploadID failed");
+ } catch (OS3Exception ex) {
+ assertEquals("NoSuchUpload", ex.getCode());
+ assertEquals(HTTP_NOT_FOUND, ex.getHttpCode());
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]