This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ce4e44cf0 HDDS-8965. Stream multi part upload support on gateway 
(#5026)
7ce4e44cf0 is described below

commit 7ce4e44cf030b6c4bc976433edcea472690f450b
Author: hao guo <[email protected]>
AuthorDate: Fri Jul 14 19:02:52 2023 +0800

    HDDS-8965. Stream multi part upload support on gateway (#5026)
---
 .../ozone/client/io/OzoneDataStreamOutput.java     |   3 +
 .../hadoop/ozone/s3/endpoint/ObjectEndpoint.java   |  29 ++++-
 .../ozone/s3/endpoint/ObjectEndpointStreaming.java |  39 ++++++
 .../hadoop/ozone/client/OzoneBucketStub.java       |  95 ++++++++++++++
 .../ozone/client/OzoneDataStreamOutputStub.java    |  70 ++++++++++
 .../s3/endpoint/TestPartUploadWithStream.java      | 145 +++++++++++++++++++++
 6 files changed, 379 insertions(+), 2 deletions(-)

diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
index a1e4731a56..ee3f98ae78 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
@@ -71,6 +71,9 @@ public class OzoneDataStreamOutput extends 
ByteBufferOutputStream {
               .getCommitUploadPartInfo();
         }
       }
+    } else if (byteBufferStreamOutput instanceof KeyDataStreamOutput) {
+      return ((KeyDataStreamOutput)
+          byteBufferStreamOutput).getCommitUploadPartInfo();
     }
     // Otherwise return null.
     return null;
diff --git 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index be0eaacffa..ad2211cd37 100644
--- 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -817,10 +817,29 @@ public class ObjectEndpoint extends EndpointBase {
         body = new SignedChunksInputStream(body);
       }
 
+      copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER);
+      String storageType = headers.getHeaderString(STORAGE_CLASS_HEADER);
+      final OzoneBucket ozoneBucket = volume.getBucket(bucket);
+      ReplicationConfig replicationConfig =
+          getReplicationConfig(ozoneBucket, storageType);
+
+      boolean enableEC = false;
+      if ((replicationConfig != null &&
+          replicationConfig.getReplicationType()  == EC) ||
+          ozoneBucket.getReplicationConfig() instanceof ECReplicationConfig) {
+        enableEC = true;
+      }
+
       try {
+        if (datastreamEnabled && !enableEC && copyHeader == null) {
+          getMetrics().updatePutKeyMetadataStats(startNanos);
+          return ObjectEndpointStreaming
+              .createMultipartKey(ozoneBucket, key, length, partNumber,
+                  uploadID, chunkSize, body);
+        }
         ozoneOutputStream = getClientProtocol().createMultipartKey(
             volume.getName(), bucket, key, length, partNumber, uploadID);
-        copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER);
+
         if (copyHeader != null) {
           Pair<String, String> result = parseSourceHeader(copyHeader);
 
@@ -1122,7 +1141,8 @@ public class ObjectEndpoint extends EndpointBase {
     }
   }
 
-  static boolean checkCopySourceModificationTime(Long lastModificationTime,
+  public static boolean checkCopySourceModificationTime(
+      Long lastModificationTime,
       String copySourceIfModifiedSinceStr,
       String copySourceIfUnmodifiedSinceStr) {
     long copySourceIfModifiedSince = Long.MIN_VALUE;
@@ -1147,4 +1167,9 @@ public class ObjectEndpoint extends EndpointBase {
   public void setOzoneConfiguration(OzoneConfiguration config) {
     this.ozoneConfiguration = config;
   }
+
+  @VisibleForTesting
+  public boolean isDatastreamEnabled() {
+    return datastreamEnabled;
+  }
 }
diff --git 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
index cfe2984467..5175bf8450 100644
--- 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
@@ -21,11 +21,14 @@ import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
 import org.apache.hadoop.ozone.s3.exception.OS3Exception;
 import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
+import org.apache.hadoop.ozone.s3.metrics.S3GatewayMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.ws.rs.core.Response;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
@@ -33,6 +36,7 @@ import java.util.Map;
 
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS;
 import static 
org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_REQUEST;
+import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.NO_SUCH_UPLOAD;
 
 /**
  * Key level rest endpoints for Streaming.
@@ -109,4 +113,39 @@ final class ObjectEndpointStreaming {
     }
     return n;
   }
+
+  public static Response createMultipartKey(OzoneBucket ozoneBucket, String 
key,
+                                            long length, int partNumber,
+                                            String uploadID, int chunkSize,
+                                            InputStream body)
+      throws IOException, OS3Exception {
+    OzoneDataStreamOutput streamOutput = null;
+    String eTag = "";
+    S3GatewayMetrics metrics = S3GatewayMetrics.create();
+    try {
+      streamOutput = ozoneBucket
+          .createMultipartStreamKey(key, length, partNumber, uploadID);
+      long putLength =
+          writeToStreamOutput(streamOutput, body, chunkSize, length);
+      metrics.incPutKeySuccessLength(putLength);
+    } catch (OMException ex) {
+      if (ex.getResult() ==
+          OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR) {
+        throw S3ErrorTable.newError(NO_SUCH_UPLOAD,
+            uploadID);
+      } else if (ex.getResult() == OMException.ResultCodes.PERMISSION_DENIED) {
+        throw S3ErrorTable.newError(S3ErrorTable.ACCESS_DENIED,
+            ozoneBucket.getName() + "/" + key);
+      }
+      throw ex;
+    } finally {
+      if (streamOutput != null) {
+        streamOutput.close();
+        OmMultipartCommitUploadPartInfo commitUploadPartInfo =
+            streamOutput.getCommitUploadPartInfo();
+        eTag = commitUploadPartInfo.getPartName();
+      }
+    }
+    return Response.ok().header("ETag", eTag).build();
+  }
 }
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
index 34c9a46096..d613297fc0 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.ozone.client;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -39,7 +40,9 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.protocol.StorageType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
 import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import 
org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts.PartInfo;
@@ -191,6 +194,98 @@ public class OzoneBucketStub extends OzoneBucket {
     return new OzoneOutputStream(byteArrayOutputStream, null);
   }
 
+  @Override
+  public OzoneDataStreamOutput createStreamKey(String key, long size,
+                                               ReplicationConfig rConfig,
+                                               Map<String, String> keyMetadata)
+      throws IOException {
+    ByteBufferStreamOutput byteBufferStreamOutput =
+        new ByteBufferStreamOutput() {
+
+          private final ByteBuffer buffer = ByteBuffer.allocate((int) size);
+
+          @Override
+          public void close() throws IOException {
+            buffer.flip();
+            byte[] bytes1 = new byte[buffer.remaining()];
+            buffer.get(bytes1);
+            keyContents.put(key, bytes1);
+            keyDetails.put(key, new OzoneKeyDetails(
+                getVolumeName(),
+                getName(),
+                key,
+                size,
+                System.currentTimeMillis(),
+                System.currentTimeMillis(),
+                new ArrayList<>(), rConfig, metadata, null,
+                null, false
+            ));
+          }
+
+          @Override
+          public void write(ByteBuffer b, int off, int len)
+              throws IOException {
+            byte[] bytes = new byte[len];
+            b.get(bytes, off, len);
+            buffer.put(bytes);
+          }
+
+          @Override
+          public void flush() throws IOException {
+          }
+        };
+
+    return new OzoneDataStreamOutputStub(byteBufferStreamOutput, key + size);
+  }
+
+  @Override
+  public OzoneDataStreamOutput createMultipartStreamKey(String key,
+                                                        long size,
+                                                        int partNumber,
+                                                        String uploadID)
+      throws IOException {
+    String multipartUploadID = multipartUploadIdMap.get(key);
+    if (multipartUploadID == null || !multipartUploadID.equals(uploadID)) {
+      throw new OMException(ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
+    } else {
+      ByteBufferStreamOutput byteBufferStreamOutput =
+          new ByteBufferStreamOutput() {
+            private final ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
+
+            @Override
+            public void close() throws IOException {
+              int position = buffer.position();
+              buffer.flip();
+              byte[] bytes = new byte[position];
+              buffer.get(bytes);
+
+              Part part = new Part(key + size, bytes);
+              if (partList.get(key) == null) {
+                Map<Integer, Part> parts = new TreeMap<>();
+                parts.put(partNumber, part);
+                partList.put(key, parts);
+              } else {
+                partList.get(key).put(partNumber, part);
+              }
+            }
+
+            @Override
+            public void write(ByteBuffer b, int off, int len)
+                throws IOException {
+              byte[] bytes = new byte[len];
+              b.get(bytes, off, len);
+              buffer.put(bytes);
+            }
+
+            @Override
+            public void flush() throws IOException {
+            }
+          };
+
+      return new OzoneDataStreamOutputStub(byteBufferStreamOutput, key + size);
+    }
+  }
+
   @Override
   public OzoneInputStream readKey(String key) throws IOException {
     return new OzoneInputStream(new 
ByteArrayInputStream(keyContents.get(key)));
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneDataStreamOutputStub.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneDataStreamOutputStub.java
new file mode 100644
index 0000000000..7bb35682d8
--- /dev/null
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneDataStreamOutputStub.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.client;
+
+import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * OzoneDataStreamOutput stub for testing.
+ */
+public class OzoneDataStreamOutputStub extends OzoneDataStreamOutput {
+
+  private final String partName;
+  private boolean closed = false;
+
+  /**
+   * Constructs OzoneDataStreamOutputStub with streamOutput and partName.
+   */
+  public OzoneDataStreamOutputStub(
+      ByteBufferStreamOutput byteBufferStreamOutput,
+      String partName) {
+    super(byteBufferStreamOutput);
+    this.partName = partName;
+  }
+
+  @Override
+  public void write(ByteBuffer b, int off, int len) throws IOException {
+    getByteBufStreamOutput().write(b, off, len);
+  }
+
+  @Override
+  public synchronized void flush() throws IOException {
+    getByteBufStreamOutput().flush();
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (!closed) {
+      getByteBufStreamOutput().close();
+      closed = true;
+    }
+  }
+
+  @Override
+  public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
+    return closed ? new OmMultipartCommitUploadPartInfo(partName) : null;
+  }
+}
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUploadWithStream.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUploadWithStream.java
new file mode 100644
index 0000000000..17ae963e11
--- /dev/null
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUploadWithStream.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.s3.endpoint;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientStub;
+import org.apache.hadoop.ozone.s3.exception.OS3Exception;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+import java.io.ByteArrayInputStream;
+
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
+
+/**
+ * This class tests Upload part request.
+ */
+public class TestPartUploadWithStream {
+
+  private static final ObjectEndpoint REST = new ObjectEndpoint();
+
+  private static final String S3BUCKET = "streampartb1";
+  private static final String S3KEY = "testkey";
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    OzoneClient client = new OzoneClientStub();
+    client.getObjectStore().createS3Bucket(S3BUCKET);
+
+
+    HttpHeaders headers = Mockito.mock(HttpHeaders.class);
+    when(headers.getHeaderString(STORAGE_CLASS_HEADER)).thenReturn("STANDARD");
+
+    REST.setHeaders(headers);
+    REST.setClient(client);
+
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLED,
+        true);
+    REST.setOzoneConfiguration(conf);
+    REST.init();
+  }
+
+  @Test
+  public void testEnableStream() {
+    assertTrue(REST.isDatastreamEnabled());
+  }
+
+  @Test
+  public void testPartUpload() throws Exception {
+
+    Response response = REST.initializeMultipartUpload(S3BUCKET, S3KEY);
+    MultipartUploadInitiateResponse multipartUploadInitiateResponse =
+        (MultipartUploadInitiateResponse) response.getEntity();
+    assertNotNull(multipartUploadInitiateResponse.getUploadID());
+    String uploadID = multipartUploadInitiateResponse.getUploadID();
+
+    assertEquals(200, response.getStatus());
+
+    String content = "Multipart Upload";
+    ByteArrayInputStream body =
+        new ByteArrayInputStream(content.getBytes(UTF_8));
+    response = REST.put(S3BUCKET, S3KEY,
+        content.length(), 1, uploadID, body);
+
+    assertNotNull(response.getHeaderString("ETag"));
+
+  }
+
+  @Test
+  public void testPartUploadWithOverride() throws Exception {
+
+    Response response = REST.initializeMultipartUpload(S3BUCKET, S3KEY);
+    MultipartUploadInitiateResponse multipartUploadInitiateResponse =
+        (MultipartUploadInitiateResponse) response.getEntity();
+    assertNotNull(multipartUploadInitiateResponse.getUploadID());
+    String uploadID = multipartUploadInitiateResponse.getUploadID();
+
+    assertEquals(200, response.getStatus());
+
+    String content = "Multipart Upload";
+    ByteArrayInputStream body =
+        new ByteArrayInputStream(content.getBytes(UTF_8));
+    response = REST.put(S3BUCKET, S3KEY,
+        content.length(), 1, uploadID, body);
+
+    assertNotNull(response.getHeaderString("ETag"));
+
+    String eTag = response.getHeaderString("ETag");
+
+    // Upload part again with same part Number, the ETag should be changed.
+    content = "Multipart Upload Changed";
+    response = REST.put(S3BUCKET, S3KEY,
+        content.length(), 1, uploadID, body);
+    assertNotNull(response.getHeaderString("ETag"));
+    assertNotEquals(eTag, response.getHeaderString("ETag"));
+
+  }
+
+  @Test
+  public void testPartUploadWithIncorrectUploadID() throws Exception {
+    try {
+      String content = "Multipart Upload With Incorrect uploadID";
+      ByteArrayInputStream body =
+          new ByteArrayInputStream(content.getBytes(UTF_8));
+      REST.put(S3BUCKET, S3KEY, content.length(), 1,
+          "random", body);
+      fail("testPartUploadWithIncorrectUploadID failed");
+    } catch (OS3Exception ex) {
+      assertEquals("NoSuchUpload", ex.getCode());
+      assertEquals(HTTP_NOT_FOUND, ex.getHttpCode());
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to