This is an automated email from the ASF dual-hosted git repository.

captainzmc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new eb0836e6ac HDDS-9201. [Ozone-Streaming] Stream copy object support on 
gateway (#5210)
eb0836e6ac is described below

commit eb0836e6acb1fa61f2f0e497a038ab795cdee200
Author: hao guo <[email protected]>
AuthorDate: Thu Sep 7 12:27:20 2023 +0800

    HDDS-9201. [Ozone-Streaming] Stream copy object support on gateway (#5210)
    
    * HDDS-9201. Stream copy object support on gateway
---
 .../hadoop/ozone/s3/endpoint/ObjectEndpoint.java   |  17 ++-
 .../hadoop/ozone/client/OzoneBucketStub.java       |   6 +-
 .../ozone/s3/endpoint/TestUploadWithStream.java    | 150 +++++++++++++++++++++
 3 files changed, 167 insertions(+), 6 deletions(-)

diff --git 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 28c27977ac..5ef67f3c6f 100644
--- 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -1010,11 +1010,18 @@ public class ObjectEndpoint extends EndpointBase {
       ReplicationConfig replication,
             Map<String, String> metadata) throws IOException {
     long copyLength;
-    try (OzoneOutputStream dest =
-                 getClientProtocol().createKey(
-        volume.getName(), destBucket, destKey, srcKeyLen,
-        replication, metadata)) {
-      copyLength = IOUtils.copyLarge(src, dest);
+    if (datastreamEnabled && !(replication != null &&
+        replication.getReplicationType() == EC) &&
+        srcKeyLen > datastreamMinLength) {
+      copyLength = ObjectEndpointStreaming
+          .putKeyWithStream(volume.getBucket(destBucket), destKey, srcKeyLen,
+              chunkSize, replication, metadata, src);
+    } else {
+      try (OzoneOutputStream dest = getClientProtocol()
+          .createKey(volume.getName(), destBucket, destKey, srcKeyLen,
+              replication, metadata)) {
+        copyLength = IOUtils.copyLarge(src, dest);
+      }
     }
     getMetrics().incCopyObjectSuccessLength(copyLength);
   }
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
index e7ed9face4..460b073b39 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
@@ -212,6 +212,10 @@ public class OzoneBucketStub extends OzoneBucket {
             byte[] bytes1 = new byte[buffer.remaining()];
             buffer.get(bytes1);
             keyContents.put(key, bytes1);
+
+            Map<String, String> objectMetadata = keyMetadata == null ?
+                new HashMap<>() : keyMetadata;
+
             keyDetails.put(key, new OzoneKeyDetails(
                 getVolumeName(),
                 getName(),
@@ -219,7 +223,7 @@ public class OzoneBucketStub extends OzoneBucket {
                 size,
                 System.currentTimeMillis(),
                 System.currentTimeMillis(),
-                new ArrayList<>(), rConfig, metadata, null,
+                new ArrayList<>(), rConfig, objectMetadata, null,
                 null, false
             ));
           }
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestUploadWithStream.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestUploadWithStream.java
new file mode 100644
index 0000000000..d96cd7bdd6
--- /dev/null
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestUploadWithStream.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.s3.endpoint;
+
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientStub;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import javax.ws.rs.container.ContainerRequestContext;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MultivaluedHashMap;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriInfo;
+import java.io.ByteArrayInputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD;
+import static org.apache.hadoop.ozone.s3.util.S3Consts.COPY_SOURCE_HEADER;
+import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+/**
+ * This class tests Upload request.
+ */
+public class TestUploadWithStream {
+
+  private static final ObjectEndpoint REST = new ObjectEndpoint();
+
+  private static final String S3BUCKET = "streamb1";
+  private static final String S3KEY = "testkey";
+  private static final String S3_COPY_EXISTING_KEY = "test_copy_existing_key";
+  private static final String S3_COPY_EXISTING_KEY_CONTENT =
+      "test_copy_existing_key_content";
+  private static OzoneClient client;
+  private static final HttpHeaders HEADERS;
+  private static ContainerRequestContext context;
+
+  static {
+    HEADERS = Mockito.mock(HttpHeaders.class);
+    when(HEADERS.getHeaderString(STORAGE_CLASS_HEADER)).thenReturn("STANDARD");
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    client = new OzoneClientStub();
+    client.getObjectStore().createS3Bucket(S3BUCKET);
+
+    REST.setHeaders(HEADERS);
+    REST.setClient(client);
+
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLED,
+        true);
+    conf.setStorageSize(OZONE_FS_DATASTREAM_AUTO_THRESHOLD, 1,
+        StorageUnit.BYTES);
+    REST.setOzoneConfiguration(conf);
+
+    context = Mockito.mock(ContainerRequestContext.class);
+    Mockito.when(context.getUriInfo()).thenReturn(Mockito.mock(UriInfo.class));
+    Mockito.when(context.getUriInfo().getQueryParameters())
+        .thenReturn(new MultivaluedHashMap<>());
+    REST.setContext(context);
+
+    REST.init();
+  }
+
+  @Test
+  public void testEnableStream() {
+    assertTrue(REST.isDatastreamEnabled());
+  }
+
+  @Test
+  public void testUpload() throws Exception {
+    byte[] keyContent = S3_COPY_EXISTING_KEY_CONTENT.getBytes(UTF_8);
+    ByteArrayInputStream body =
+        new ByteArrayInputStream(keyContent);
+    Response response = REST.put(S3BUCKET, S3KEY, 0, 0, null, body);
+
+    assertEquals(200, response.getStatus());
+  }
+
+  @Test
+  public void testUploadWithCopy() throws Exception {
+    OzoneBucket bucket =
+        client.getObjectStore().getS3Bucket(S3BUCKET);
+
+    byte[] keyContent = S3_COPY_EXISTING_KEY_CONTENT.getBytes(UTF_8);
+    try (OutputStream stream = bucket
+        .createStreamKey(S3_COPY_EXISTING_KEY, keyContent.length,
+            ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS,
+                ReplicationFactor.THREE), new HashMap<>())) {
+      stream.write(keyContent);
+    }
+
+    final long dataSize = bucket.getKey(S3_COPY_EXISTING_KEY).getDataSize();
+    assertEquals(dataSize, keyContent.length);
+
+
+    Map<String, String> additionalHeaders = new HashMap<>();
+    additionalHeaders
+        .put(COPY_SOURCE_HEADER, S3BUCKET + "/" + S3_COPY_EXISTING_KEY);
+
+    HttpHeaders headers = Mockito.mock(HttpHeaders.class);
+    when(headers.getHeaderString(STORAGE_CLASS_HEADER)).thenReturn(
+        "STANDARD");
+
+    additionalHeaders
+        .forEach((k, v) -> when(headers.getHeaderString(k)).thenReturn(v));
+    REST.setHeaders(headers);
+
+    Response response = REST.put(S3BUCKET, S3KEY, 0, 0, null, null);
+
+    assertEquals(200, response.getStatus());
+
+    final long newDataSize = bucket.getKey(S3KEY).getDataSize();
+    assertEquals(dataSize, newDataSize);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to