This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 2529f3d595 HDDS-5869. Added support for stream on S3Gateway write path 
(#4970)
2529f3d595 is described below

commit 2529f3d5950c5c451852c9fd98cbd934e9164699
Author: hao guo <[email protected]>
AuthorDate: Wed Jun 28 12:27:30 2023 +0800

    HDDS-5869. Added support for stream on S3Gateway write path (#4970)
---
 .../hadoop/ozone/s3/endpoint/ObjectEndpoint.java   |  44 +++++++-
 .../ozone/s3/endpoint/ObjectEndpointStreaming.java | 112 +++++++++++++++++++++
 2 files changed, 152 insertions(+), 4 deletions(-)

diff --git 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 5c02ae2cf7..fe51cf77f2 100644
--- 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -54,6 +54,7 @@ import java.util.List;
 import java.util.OptionalLong;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -93,6 +94,13 @@ import org.apache.commons.io.IOUtils;
 
 import org.apache.commons.lang3.tuple.Pair;
 
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLED;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLED_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_REPLICATION;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_REPLICATION_TYPE;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT;
@@ -141,6 +149,9 @@ public class ObjectEndpoint extends EndpointBase {
   https://docs.aws.amazon.com/de_de/AmazonS3/latest/API/API_GetObject.html */
   private Map<String, String> overrideQueryParameter;
   private int bufferSize;
+  private int chunkSize;
+  private boolean datastreamEnabled;
+  private long datastreamMinLength;
 
   public ObjectEndpoint() {
     overrideQueryParameter = ImmutableMap.<String, String>builder()
@@ -161,6 +172,16 @@ public class ObjectEndpoint extends EndpointBase {
     bufferSize = (int) ozoneConfiguration.getStorageSize(
         OZONE_S3G_CLIENT_BUFFER_SIZE_KEY,
         OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT, StorageUnit.BYTES);
+    chunkSize = (int) ozoneConfiguration.getStorageSize(
+        OZONE_SCM_CHUNK_SIZE_KEY,
+        OZONE_SCM_CHUNK_SIZE_DEFAULT,
+        StorageUnit.BYTES);
+    datastreamEnabled = ozoneConfiguration.getBoolean(
+        DFS_CONTAINER_RATIS_DATASTREAM_ENABLED,
+        DFS_CONTAINER_RATIS_DATASTREAM_ENABLED_DEFAULT);
+    datastreamMinLength = (long) ozoneConfiguration.getStorageSize(
+        OZONE_FS_DATASTREAM_AUTO_THRESHOLD,
+        OZONE_FS_DATASTREAM_AUTO_THRESHOLD_DEFAULT, StorageUnit.BYTES);
   }
 
   /**
@@ -203,6 +224,13 @@ public class ObjectEndpoint extends EndpointBase {
       ReplicationConfig replicationConfig =
           getReplicationConfig(bucket, storageType);
 
+      boolean enableEC = false;
+      if ((replicationConfig != null &&
+          replicationConfig.getReplicationType() == EC) ||
+          bucket.getReplicationConfig() instanceof ECReplicationConfig) {
+        enableEC = true;
+      }
+
       if (copyHeader != null) {
         //Copy object, as copy source available.
         s3GAction = S3GAction.COPY_OBJECT;
@@ -233,11 +261,19 @@ public class ObjectEndpoint extends EndpointBase {
           .equals(headers.getHeaderString("x-amz-content-sha256"))) {
         body = new SignedChunksInputStream(body);
       }
+      long putLength = 0;
+      if (datastreamEnabled && !enableEC && length > datastreamMinLength) {
+        getMetrics().updatePutKeyMetadataStats(startNanos);
+        putLength = ObjectEndpointStreaming
+            .put(bucket, keyPath, length, replicationConfig, chunkSize,
+                customMetadata, body);
+      } else {
+        output = getClientProtocol().createKey(volume.getName(), bucketName,
+            keyPath, length, replicationConfig, customMetadata);
+        getMetrics().updatePutKeyMetadataStats(startNanos);
+        putLength = IOUtils.copyLarge(body, output);
+      }
 
-      output = getClientProtocol().createKey(volume.getName(), bucketName,
-          keyPath, length, replicationConfig, customMetadata);
-      getMetrics().updatePutKeyMetadataStats(startNanos);
-      long putLength = IOUtils.copyLarge(body, output);
       getMetrics().incPutKeySuccessLength(putLength);
       return Response.ok().status(HttpStatus.SC_OK)
           .build();
diff --git 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
new file mode 100644
index 0000000000..cfe2984467
--- /dev/null
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.s3.endpoint;
+
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.s3.exception.OS3Exception;
+import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS;
+import static 
org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_REQUEST;
+
+/**
+ * Key level rest endpoints for Streaming.
+ */
+final class ObjectEndpointStreaming {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ObjectEndpointStreaming.class);
+
+  private ObjectEndpointStreaming() {
+  }
+
+  public static long put(OzoneBucket bucket, String keyPath,
+                         long length, ReplicationConfig replicationConfig,
+                         int chunkSize, Map<String, String> keyMetadata,
+                         InputStream body)
+      throws IOException, OS3Exception {
+
+    try {
+      return putKeyWithStream(bucket, keyPath,
+          length, chunkSize, replicationConfig, keyMetadata, body);
+    } catch (IOException ex) {
+      LOG.error("Exception occurred in PutObject", ex);
+      if (ex instanceof OMException) {
+        if (((OMException) ex).getResult() ==
+            OMException.ResultCodes.NOT_A_FILE) {
+          OS3Exception os3Exception = S3ErrorTable.newError(INVALID_REQUEST,
+              keyPath);
+          os3Exception.setErrorMessage("An error occurred (InvalidRequest) " +
+              "when calling the PutObject/MPU PartUpload operation: " +
+              OZONE_OM_ENABLE_FILESYSTEM_PATHS + " is enabled Keys are" +
+              " considered as Unix Paths. Path has Violated FS Semantics " +
+              "which caused put operation to fail.");
+          throw os3Exception;
+        } else if ((((OMException) ex).getResult() ==
+            OMException.ResultCodes.PERMISSION_DENIED)) {
+          throw S3ErrorTable.newError(S3ErrorTable.ACCESS_DENIED, keyPath);
+        }
+      }
+      throw ex;
+    }
+  }
+
+  public static long putKeyWithStream(OzoneBucket bucket,
+                                      String keyPath,
+                                      long length,
+                                      int bufferSize,
+                                      ReplicationConfig replicationConfig,
+                                      Map<String, String> keyMetadata,
+                                      InputStream body)
+      throws IOException {
+    long writeLen = 0;
+    try (OzoneDataStreamOutput streamOutput = bucket.createStreamKey(keyPath,
+        length, replicationConfig, keyMetadata)) {
+      writeLen = writeToStreamOutput(streamOutput, body, bufferSize, length);
+    }
+    return writeLen;
+  }
+
+  private static long writeToStreamOutput(OzoneDataStreamOutput streamOutput,
+                                          InputStream body, int bufferSize,
+                                          long length)
+      throws IOException {
+    final byte[] buffer = new byte[bufferSize];
+    long n = 0;
+    while (n < length) {
+      final int toRead = Math.toIntExact(Math.min(bufferSize, length - n));
+      final int readLength = body.read(buffer, 0, toRead);
+      if (readLength == -1) {
+        break;
+      }
+      streamOutput.write(ByteBuffer.wrap(buffer, 0, readLength));
+      n += readLength;
+    }
+    return n;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to