This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 2529f3d595 HDDS-5869. Added support for stream on S3Gateway write path
(#4970)
2529f3d595 is described below
commit 2529f3d5950c5c451852c9fd98cbd934e9164699
Author: hao guo <[email protected]>
AuthorDate: Wed Jun 28 12:27:30 2023 +0800
HDDS-5869. Added support for stream on S3Gateway write path (#4970)
---
.../hadoop/ozone/s3/endpoint/ObjectEndpoint.java | 44 +++++++-
.../ozone/s3/endpoint/ObjectEndpointStreaming.java | 112 +++++++++++++++++++++
2 files changed, 152 insertions(+), 4 deletions(-)
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 5c02ae2cf7..fe51cf77f2 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -54,6 +54,7 @@ import java.util.List;
import java.util.OptionalLong;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -93,6 +94,13 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLED;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLED_DEFAULT;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_REPLICATION;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_REPLICATION_TYPE;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT;
@@ -141,6 +149,9 @@ public class ObjectEndpoint extends EndpointBase {
https://docs.aws.amazon.com/de_de/AmazonS3/latest/API/API_GetObject.html */
private Map<String, String> overrideQueryParameter;
private int bufferSize;
+ private int chunkSize;
+ private boolean datastreamEnabled;
+ private long datastreamMinLength;
public ObjectEndpoint() {
overrideQueryParameter = ImmutableMap.<String, String>builder()
@@ -161,6 +172,16 @@ public class ObjectEndpoint extends EndpointBase {
bufferSize = (int) ozoneConfiguration.getStorageSize(
OZONE_S3G_CLIENT_BUFFER_SIZE_KEY,
OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT, StorageUnit.BYTES);
+ chunkSize = (int) ozoneConfiguration.getStorageSize(
+ OZONE_SCM_CHUNK_SIZE_KEY,
+ OZONE_SCM_CHUNK_SIZE_DEFAULT,
+ StorageUnit.BYTES);
+ datastreamEnabled = ozoneConfiguration.getBoolean(
+ DFS_CONTAINER_RATIS_DATASTREAM_ENABLED,
+ DFS_CONTAINER_RATIS_DATASTREAM_ENABLED_DEFAULT);
+ datastreamMinLength = (long) ozoneConfiguration.getStorageSize(
+ OZONE_FS_DATASTREAM_AUTO_THRESHOLD,
+ OZONE_FS_DATASTREAM_AUTO_THRESHOLD_DEFAULT, StorageUnit.BYTES);
}
/**
@@ -203,6 +224,13 @@ public class ObjectEndpoint extends EndpointBase {
ReplicationConfig replicationConfig =
getReplicationConfig(bucket, storageType);
+ boolean enableEC = false;
+ if ((replicationConfig != null &&
+ replicationConfig.getReplicationType() == EC) ||
+ bucket.getReplicationConfig() instanceof ECReplicationConfig) {
+ enableEC = true;
+ }
+
if (copyHeader != null) {
//Copy object, as copy source available.
s3GAction = S3GAction.COPY_OBJECT;
@@ -233,11 +261,19 @@ public class ObjectEndpoint extends EndpointBase {
.equals(headers.getHeaderString("x-amz-content-sha256"))) {
body = new SignedChunksInputStream(body);
}
+ long putLength = 0;
+ if (datastreamEnabled && !enableEC && length > datastreamMinLength) {
+ getMetrics().updatePutKeyMetadataStats(startNanos);
+ putLength = ObjectEndpointStreaming
+ .put(bucket, keyPath, length, replicationConfig, chunkSize,
+ customMetadata, body);
+ } else {
+ output = getClientProtocol().createKey(volume.getName(), bucketName,
+ keyPath, length, replicationConfig, customMetadata);
+ getMetrics().updatePutKeyMetadataStats(startNanos);
+ putLength = IOUtils.copyLarge(body, output);
+ }
- output = getClientProtocol().createKey(volume.getName(), bucketName,
- keyPath, length, replicationConfig, customMetadata);
- getMetrics().updatePutKeyMetadataStats(startNanos);
- long putLength = IOUtils.copyLarge(body, output);
getMetrics().incPutKeySuccessLength(putLength);
return Response.ok().status(HttpStatus.SC_OK)
.build();
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
new file mode 100644
index 0000000000..cfe2984467
--- /dev/null
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.s3.endpoint;
+
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.s3.exception.OS3Exception;
+import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS;
+import static
org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_REQUEST;
+
+/**
+ * Key level rest endpoints for Streaming.
+ */
+final class ObjectEndpointStreaming {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ObjectEndpointStreaming.class);
+
+ private ObjectEndpointStreaming() {
+ }
+
+ public static long put(OzoneBucket bucket, String keyPath,
+ long length, ReplicationConfig replicationConfig,
+ int chunkSize, Map<String, String> keyMetadata,
+ InputStream body)
+ throws IOException, OS3Exception {
+
+ try {
+ return putKeyWithStream(bucket, keyPath,
+ length, chunkSize, replicationConfig, keyMetadata, body);
+ } catch (IOException ex) {
+ LOG.error("Exception occurred in PutObject", ex);
+ if (ex instanceof OMException) {
+ if (((OMException) ex).getResult() ==
+ OMException.ResultCodes.NOT_A_FILE) {
+ OS3Exception os3Exception = S3ErrorTable.newError(INVALID_REQUEST,
+ keyPath);
+ os3Exception.setErrorMessage("An error occurred (InvalidRequest) " +
+ "when calling the PutObject/MPU PartUpload operation: " +
+ OZONE_OM_ENABLE_FILESYSTEM_PATHS + " is enabled Keys are" +
+ " considered as Unix Paths. Path has Violated FS Semantics " +
+ "which caused put operation to fail.");
+ throw os3Exception;
+ } else if ((((OMException) ex).getResult() ==
+ OMException.ResultCodes.PERMISSION_DENIED)) {
+ throw S3ErrorTable.newError(S3ErrorTable.ACCESS_DENIED, keyPath);
+ }
+ }
+ throw ex;
+ }
+ }
+
+ public static long putKeyWithStream(OzoneBucket bucket,
+ String keyPath,
+ long length,
+ int bufferSize,
+ ReplicationConfig replicationConfig,
+ Map<String, String> keyMetadata,
+ InputStream body)
+ throws IOException {
+ long writeLen = 0;
+ try (OzoneDataStreamOutput streamOutput = bucket.createStreamKey(keyPath,
+ length, replicationConfig, keyMetadata)) {
+ writeLen = writeToStreamOutput(streamOutput, body, bufferSize, length);
+ }
+ return writeLen;
+ }
+
+ private static long writeToStreamOutput(OzoneDataStreamOutput streamOutput,
+ InputStream body, int bufferSize,
+ long length)
+ throws IOException {
+ final byte[] buffer = new byte[bufferSize];
+ long n = 0;
+ while (n < length) {
+ final int toRead = Math.toIntExact(Math.min(bufferSize, length - n));
+ final int readLength = body.read(buffer, 0, toRead);
+ if (readLength == -1) {
+ break;
+ }
+ streamOutput.write(ByteBuffer.wrap(buffer, 0, readLength));
+ n += readLength;
+ }
+ return n;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]