This is an automated email from the ASF dual-hosted git repository.
captainzmc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new eb0836e6ac HDDS-9201. [Ozone-Streaming] Stream copy object support on
gateway (#5210)
eb0836e6ac is described below
commit eb0836e6acb1fa61f2f0e497a038ab795cdee200
Author: hao guo <[email protected]>
AuthorDate: Thu Sep 7 12:27:20 2023 +0800
HDDS-9201. [Ozone-Streaming] Stream copy object support on gateway (#5210)
* HDDS-9201. Stream copy object support on gateway
---
.../hadoop/ozone/s3/endpoint/ObjectEndpoint.java | 17 ++-
.../hadoop/ozone/client/OzoneBucketStub.java | 6 +-
.../ozone/s3/endpoint/TestUploadWithStream.java | 150 +++++++++++++++++++++
3 files changed, 167 insertions(+), 6 deletions(-)
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 28c27977ac..5ef67f3c6f 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -1010,11 +1010,18 @@ public class ObjectEndpoint extends EndpointBase {
ReplicationConfig replication,
Map<String, String> metadata) throws IOException {
long copyLength;
- try (OzoneOutputStream dest =
- getClientProtocol().createKey(
- volume.getName(), destBucket, destKey, srcKeyLen,
- replication, metadata)) {
- copyLength = IOUtils.copyLarge(src, dest);
+ if (datastreamEnabled && !(replication != null &&
+ replication.getReplicationType() == EC) &&
+ srcKeyLen > datastreamMinLength) {
+ copyLength = ObjectEndpointStreaming
+ .putKeyWithStream(volume.getBucket(destBucket), destKey, srcKeyLen,
+ chunkSize, replication, metadata, src);
+ } else {
+ try (OzoneOutputStream dest = getClientProtocol()
+ .createKey(volume.getName(), destBucket, destKey, srcKeyLen,
+ replication, metadata)) {
+ copyLength = IOUtils.copyLarge(src, dest);
+ }
}
getMetrics().incCopyObjectSuccessLength(copyLength);
}
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
index e7ed9face4..460b073b39 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
@@ -212,6 +212,10 @@ public class OzoneBucketStub extends OzoneBucket {
byte[] bytes1 = new byte[buffer.remaining()];
buffer.get(bytes1);
keyContents.put(key, bytes1);
+
+ Map<String, String> objectMetadata = keyMetadata == null ?
+ new HashMap<>() : keyMetadata;
+
keyDetails.put(key, new OzoneKeyDetails(
getVolumeName(),
getName(),
@@ -219,7 +223,7 @@ public class OzoneBucketStub extends OzoneBucket {
size,
System.currentTimeMillis(),
System.currentTimeMillis(),
- new ArrayList<>(), rConfig, metadata, null,
+ new ArrayList<>(), rConfig, objectMetadata, null,
null, false
));
}
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestUploadWithStream.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestUploadWithStream.java
new file mode 100644
index 0000000000..d96cd7bdd6
--- /dev/null
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestUploadWithStream.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.s3.endpoint;
+
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientStub;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import javax.ws.rs.container.ContainerRequestContext;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MultivaluedHashMap;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriInfo;
+import java.io.ByteArrayInputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD;
+import static org.apache.hadoop.ozone.s3.util.S3Consts.COPY_SOURCE_HEADER;
+import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+/**
+ * This class tests Upload request.
+ */
+public class TestUploadWithStream {
+
+ private static final ObjectEndpoint REST = new ObjectEndpoint();
+
+ private static final String S3BUCKET = "streamb1";
+ private static final String S3KEY = "testkey";
+ private static final String S3_COPY_EXISTING_KEY = "test_copy_existing_key";
+ private static final String S3_COPY_EXISTING_KEY_CONTENT =
+ "test_copy_existing_key_content";
+ private static OzoneClient client;
+ private static final HttpHeaders HEADERS;
+ private static ContainerRequestContext context;
+
+ static {
+ HEADERS = Mockito.mock(HttpHeaders.class);
+ when(HEADERS.getHeaderString(STORAGE_CLASS_HEADER)).thenReturn("STANDARD");
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ client = new OzoneClientStub();
+ client.getObjectStore().createS3Bucket(S3BUCKET);
+
+ REST.setHeaders(HEADERS);
+ REST.setClient(client);
+
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLED,
+ true);
+ conf.setStorageSize(OZONE_FS_DATASTREAM_AUTO_THRESHOLD, 1,
+ StorageUnit.BYTES);
+ REST.setOzoneConfiguration(conf);
+
+ context = Mockito.mock(ContainerRequestContext.class);
+ Mockito.when(context.getUriInfo()).thenReturn(Mockito.mock(UriInfo.class));
+ Mockito.when(context.getUriInfo().getQueryParameters())
+ .thenReturn(new MultivaluedHashMap<>());
+ REST.setContext(context);
+
+ REST.init();
+ }
+
+ @Test
+ public void testEnableStream() {
+ assertTrue(REST.isDatastreamEnabled());
+ }
+
+ @Test
+ public void testUpload() throws Exception {
+ byte[] keyContent = S3_COPY_EXISTING_KEY_CONTENT.getBytes(UTF_8);
+ ByteArrayInputStream body =
+ new ByteArrayInputStream(keyContent);
+ Response response = REST.put(S3BUCKET, S3KEY, 0, 0, null, body);
+
+ assertEquals(200, response.getStatus());
+ }
+
+ @Test
+ public void testUploadWithCopy() throws Exception {
+ OzoneBucket bucket =
+ client.getObjectStore().getS3Bucket(S3BUCKET);
+
+ byte[] keyContent = S3_COPY_EXISTING_KEY_CONTENT.getBytes(UTF_8);
+ try (OutputStream stream = bucket
+ .createStreamKey(S3_COPY_EXISTING_KEY, keyContent.length,
+ ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS,
+ ReplicationFactor.THREE), new HashMap<>())) {
+ stream.write(keyContent);
+ }
+
+ final long dataSize = bucket.getKey(S3_COPY_EXISTING_KEY).getDataSize();
+ assertEquals(dataSize, keyContent.length);
+
+
+ Map<String, String> additionalHeaders = new HashMap<>();
+ additionalHeaders
+ .put(COPY_SOURCE_HEADER, S3BUCKET + "/" + S3_COPY_EXISTING_KEY);
+
+ HttpHeaders headers = Mockito.mock(HttpHeaders.class);
+ when(headers.getHeaderString(STORAGE_CLASS_HEADER)).thenReturn(
+ "STANDARD");
+
+ additionalHeaders
+ .forEach((k, v) -> when(headers.getHeaderString(k)).thenReturn(v));
+ REST.setHeaders(headers);
+
+ Response response = REST.put(S3BUCKET, S3KEY, 0, 0, null, null);
+
+ assertEquals(200, response.getStatus());
+
+ final long newDataSize = bucket.getKey(S3KEY).getDataSize();
+ assertEquals(dataSize, newDataSize);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]