This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 85eb89b94e HDDS-11483. Make s3g object get and put operation buffer 
configurable (#7233)
85eb89b94e is described below

commit 85eb89b94e7e98a759e1757b5c54a713a3a12ea0
Author: Sammi Chen <[email protected]>
AuthorDate: Fri Oct 18 02:13:19 2024 +0800

    HDDS-11483. Make s3g object get and put operation buffer configurable 
(#7233)
---
 .../common/src/main/resources/ozone-default.xml    |  4 ++--
 .../hadoop/ozone/s3/S3GatewayConfigKeys.java       |  2 +-
 .../hadoop/ozone/s3/endpoint/ObjectEndpoint.java   | 28 ++++++++++++++++------
 .../hadoop/ozone/s3/endpoint/TestObjectPut.java    | 19 +++++++++++++--
 .../hadoop/ozone/s3/endpoint/TestPartUpload.java   |  3 ++-
 5 files changed, 43 insertions(+), 13 deletions(-)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 9944051f3e..e2231a5c38 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3476,9 +3476,9 @@
   <property>
     <name>ozone.s3g.client.buffer.size</name>
     <tag>OZONE, S3GATEWAY</tag>
-    <value>4KB</value>
+    <value>4MB</value>
     <description>
-      The size of the buffer which is for read block. (4KB by default).
+      The size of the buffer which is for read block. (4MB by default).
     </description>
   </property>
   <property>
diff --git 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/S3GatewayConfigKeys.java
 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/S3GatewayConfigKeys.java
index a058e413b9..9160025a01 100644
--- 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/S3GatewayConfigKeys.java
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/S3GatewayConfigKeys.java
@@ -56,7 +56,7 @@ public final class S3GatewayConfigKeys {
   public static final String OZONE_S3G_CLIENT_BUFFER_SIZE_KEY =
       "ozone.s3g.client.buffer.size";
   public static final String OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT =
-      "4KB";
+      "4MB";
 
   // S3G kerberos, principal config
   public static final String OZONE_S3G_KERBEROS_KEYTAB_FILE_KEY =
diff --git 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 40b1e013a4..9dbc7b9aab 100644
--- 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -320,7 +320,7 @@ public class ObjectEndpoint extends EndpointBase {
           long metadataLatencyNs =
               getMetrics().updatePutKeyMetadataStats(startNanos);
           perf.appendMetaLatencyNanos(metadataLatencyNs);
-          putLength = IOUtils.copyLarge(digestInputStream, output);
+          putLength = IOUtils.copy(digestInputStream, output, 
getIOBufferSize(length));
           eTag = DatatypeConverter.printHexBinary(
                   digestInputStream.getMessageDigest().digest())
               .toLowerCase();
@@ -443,7 +443,7 @@ public class ObjectEndpoint extends EndpointBase {
       if (rangeHeaderVal == null || rangeHeader.isReadFull()) {
         StreamingOutput output = dest -> {
           try (OzoneInputStream key = keyDetails.getContent()) {
-            long readLength = IOUtils.copyLarge(key, dest);
+            long readLength = IOUtils.copy(key, dest, 
getIOBufferSize(keyDetails.getDataSize()));
             getMetrics().incGetKeySuccessLength(readLength);
             perf.appendSizeBytes(readLength);
           }
@@ -467,7 +467,7 @@ public class ObjectEndpoint extends EndpointBase {
           try (OzoneInputStream ozoneInputStream = keyDetails.getContent()) {
             ozoneInputStream.seek(startOffset);
             long readLength = IOUtils.copyLarge(ozoneInputStream, dest, 0,
-                copyLength, new byte[bufferSize]);
+                copyLength, new byte[getIOBufferSize(copyLength)]);
             getMetrics().incGetKeySuccessLength(readLength);
             perf.appendSizeBytes(readLength);
           }
@@ -997,7 +997,7 @@ public class ObjectEndpoint extends EndpointBase {
               metadataLatencyNs =
                   getMetrics().updateCopyKeyMetadataStats(startNanos);
               copyLength = IOUtils.copyLarge(
-                  sourceObject, ozoneOutputStream, 0, length);
+                  sourceObject, ozoneOutputStream, 0, length, new 
byte[getIOBufferSize(length)]);
               ozoneOutputStream.getMetadata()
                   .putAll(sourceKeyDetails.getMetadata());
               outputStream = ozoneOutputStream;
@@ -1008,7 +1008,7 @@ public class ObjectEndpoint extends EndpointBase {
                     partNumber, uploadID)) {
               metadataLatencyNs =
                   getMetrics().updateCopyKeyMetadataStats(startNanos);
-              copyLength = IOUtils.copyLarge(sourceObject, ozoneOutputStream);
+              copyLength = IOUtils.copy(sourceObject, ozoneOutputStream, 
getIOBufferSize(length));
               ozoneOutputStream.getMetadata()
                   .putAll(sourceKeyDetails.getMetadata());
               outputStream = ozoneOutputStream;
@@ -1024,7 +1024,7 @@ public class ObjectEndpoint extends EndpointBase {
                 partNumber, uploadID)) {
           metadataLatencyNs =
               getMetrics().updatePutKeyMetadataStats(startNanos);
-          putLength = IOUtils.copyLarge(digestInputStream, ozoneOutputStream);
+          putLength = IOUtils.copy(digestInputStream, ozoneOutputStream, 
getIOBufferSize(length));
           byte[] digest = digestInputStream.getMessageDigest().digest();
           ozoneOutputStream.getMetadata()
               .put(ETAG, 
DatatypeConverter.printHexBinary(digest).toLowerCase());
@@ -1178,7 +1178,7 @@ public class ObjectEndpoint extends EndpointBase {
         long metadataLatencyNs =
             getMetrics().updateCopyKeyMetadataStats(startNanos);
         perf.appendMetaLatencyNanos(metadataLatencyNs);
-        copyLength = IOUtils.copyLarge(src, dest);
+        copyLength = IOUtils.copy(src, dest, getIOBufferSize(srcKeyLen));
         String eTag = 
DatatypeConverter.printHexBinary(src.getMessageDigest().digest()).toLowerCase();
         dest.getMetadata().put(ETAG, eTag);
       }
@@ -1408,4 +1408,18 @@ public class ObjectEndpoint extends EndpointBase {
     }
     return null;
   }
+
+  private int getIOBufferSize(long fileLength) {
+    if (bufferSize == 0) {
+      // this is mainly for unit tests as init() will not be called in the 
unit tests
+      LOG.warn("buffer size is set to {}", IOUtils.DEFAULT_BUFFER_SIZE);
+      bufferSize = IOUtils.DEFAULT_BUFFER_SIZE;
+    }
+    if (fileLength == 0) {
+      // for empty file
+      return bufferSize;
+    } else {
+      return fileLength < bufferSize ? (int) fileLength : bufferSize;
+    }
+  }
 }
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
index 8cde144a37..8b3d9e1ad2 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
@@ -31,6 +31,7 @@ import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MultivaluedHashMap;
 import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.Response;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -79,6 +80,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
@@ -368,7 +370,7 @@ class TestObjectPut {
     MessageDigest messageDigest = mock(MessageDigest.class);
     try (MockedStatic<IOUtils> mocked = mockStatic(IOUtils.class)) {
       // For example, EOFException during put-object due to client cancelling 
the operation before it completes
-      mocked.when(() -> IOUtils.copyLarge(any(InputStream.class), 
any(OutputStream.class)))
+      mocked.when(() -> IOUtils.copy(any(InputStream.class), 
any(OutputStream.class), anyInt()))
           .thenThrow(IOException.class);
       
when(objectEndpoint.getMessageDigestInstance()).thenReturn(messageDigest);
 
@@ -553,7 +555,7 @@ class TestObjectPut {
     try (MockedStatic<IOUtils> mocked = mockStatic(IOUtils.class)) {
       // Add the mocked methods only during the copy request
       
when(objectEndpoint.getMessageDigestInstance()).thenReturn(messageDigest);
-      mocked.when(() -> IOUtils.copyLarge(any(InputStream.class), 
any(OutputStream.class)))
+      mocked.when(() -> IOUtils.copy(any(InputStream.class), 
any(OutputStream.class), anyInt()))
           .thenThrow(IOException.class);
 
       // Add copy header, and then call put
@@ -731,4 +733,17 @@ class TestObjectPut {
     assertEquals(S3ErrorTable.NO_OVERWRITE.getCode(), exception.getCode());
     assertEquals(S3ErrorTable.NO_OVERWRITE.getHttpCode(), 
exception.getHttpCode());
   }
+
+  @Test
+  public void testPutEmptyObject() throws IOException, OS3Exception {
+    HttpHeaders headersWithTags = Mockito.mock(HttpHeaders.class);
+    String emptyString = "";
+    ByteArrayInputStream body = new 
ByteArrayInputStream(emptyString.getBytes(UTF_8));
+    objectEndpoint.setHeaders(headersWithTags);
+
+    Response putResponse = objectEndpoint.put(BUCKET_NAME, KEY_NAME, 
emptyString.length(), 1, null, body);
+    assertEquals(200, putResponse.getStatus());
+    OzoneKeyDetails keyDetails = 
clientStub.getObjectStore().getS3Bucket(BUCKET_NAME).getKey(KEY_NAME);
+    assertEquals(0, keyDetails.getDataSize());
+  }
 }
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
index aecc56fe17..fbff764829 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
@@ -51,6 +51,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.spy;
@@ -234,7 +235,7 @@ public class TestPartUpload {
     try (MockedStatic<IOUtils> mocked = mockStatic(IOUtils.class)) {
       // Add the mocked methods only during the copy request
       
when(objectEndpoint.getMessageDigestInstance()).thenReturn(messageDigest);
-      mocked.when(() -> IOUtils.copyLarge(any(InputStream.class), 
any(OutputStream.class)))
+      mocked.when(() -> IOUtils.copy(any(InputStream.class), 
any(OutputStream.class), anyInt()))
           .thenThrow(IOException.class);
 
       String content = "Multipart Upload";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to