This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 85eb89b94e HDDS-11483. Make s3g object get and put operation buffer
configurable (#7233)
85eb89b94e is described below
commit 85eb89b94e7e98a759e1757b5c54a713a3a12ea0
Author: Sammi Chen <[email protected]>
AuthorDate: Fri Oct 18 02:13:19 2024 +0800
HDDS-11483. Make s3g object get and put operation buffer configurable
(#7233)
---
.../common/src/main/resources/ozone-default.xml | 4 ++--
.../hadoop/ozone/s3/S3GatewayConfigKeys.java | 2 +-
.../hadoop/ozone/s3/endpoint/ObjectEndpoint.java | 28 ++++++++++++++++------
.../hadoop/ozone/s3/endpoint/TestObjectPut.java | 19 +++++++++++++--
.../hadoop/ozone/s3/endpoint/TestPartUpload.java | 3 ++-
5 files changed, 43 insertions(+), 13 deletions(-)
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 9944051f3e..e2231a5c38 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3476,9 +3476,9 @@
<property>
<name>ozone.s3g.client.buffer.size</name>
<tag>OZONE, S3GATEWAY</tag>
- <value>4KB</value>
+ <value>4MB</value>
<description>
- The size of the buffer which is for read block. (4KB by default).
+ The size of the buffer which is for read block. (4MB by default).
</description>
</property>
<property>
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/S3GatewayConfigKeys.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/S3GatewayConfigKeys.java
index a058e413b9..9160025a01 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/S3GatewayConfigKeys.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/S3GatewayConfigKeys.java
@@ -56,7 +56,7 @@ public final class S3GatewayConfigKeys {
public static final String OZONE_S3G_CLIENT_BUFFER_SIZE_KEY =
"ozone.s3g.client.buffer.size";
public static final String OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT =
- "4KB";
+ "4MB";
// S3G kerberos, principal config
public static final String OZONE_S3G_KERBEROS_KEYTAB_FILE_KEY =
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 40b1e013a4..9dbc7b9aab 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -320,7 +320,7 @@ public class ObjectEndpoint extends EndpointBase {
long metadataLatencyNs =
getMetrics().updatePutKeyMetadataStats(startNanos);
perf.appendMetaLatencyNanos(metadataLatencyNs);
- putLength = IOUtils.copyLarge(digestInputStream, output);
+ putLength = IOUtils.copy(digestInputStream, output,
getIOBufferSize(length));
eTag = DatatypeConverter.printHexBinary(
digestInputStream.getMessageDigest().digest())
.toLowerCase();
@@ -443,7 +443,7 @@ public class ObjectEndpoint extends EndpointBase {
if (rangeHeaderVal == null || rangeHeader.isReadFull()) {
StreamingOutput output = dest -> {
try (OzoneInputStream key = keyDetails.getContent()) {
- long readLength = IOUtils.copyLarge(key, dest);
+ long readLength = IOUtils.copy(key, dest,
getIOBufferSize(keyDetails.getDataSize()));
getMetrics().incGetKeySuccessLength(readLength);
perf.appendSizeBytes(readLength);
}
@@ -467,7 +467,7 @@ public class ObjectEndpoint extends EndpointBase {
try (OzoneInputStream ozoneInputStream = keyDetails.getContent()) {
ozoneInputStream.seek(startOffset);
long readLength = IOUtils.copyLarge(ozoneInputStream, dest, 0,
- copyLength, new byte[bufferSize]);
+ copyLength, new byte[getIOBufferSize(copyLength)]);
getMetrics().incGetKeySuccessLength(readLength);
perf.appendSizeBytes(readLength);
}
@@ -997,7 +997,7 @@ public class ObjectEndpoint extends EndpointBase {
metadataLatencyNs =
getMetrics().updateCopyKeyMetadataStats(startNanos);
copyLength = IOUtils.copyLarge(
- sourceObject, ozoneOutputStream, 0, length);
+ sourceObject, ozoneOutputStream, 0, length, new
byte[getIOBufferSize(length)]);
ozoneOutputStream.getMetadata()
.putAll(sourceKeyDetails.getMetadata());
outputStream = ozoneOutputStream;
@@ -1008,7 +1008,7 @@ public class ObjectEndpoint extends EndpointBase {
partNumber, uploadID)) {
metadataLatencyNs =
getMetrics().updateCopyKeyMetadataStats(startNanos);
- copyLength = IOUtils.copyLarge(sourceObject, ozoneOutputStream);
+ copyLength = IOUtils.copy(sourceObject, ozoneOutputStream,
getIOBufferSize(length));
ozoneOutputStream.getMetadata()
.putAll(sourceKeyDetails.getMetadata());
outputStream = ozoneOutputStream;
@@ -1024,7 +1024,7 @@ public class ObjectEndpoint extends EndpointBase {
partNumber, uploadID)) {
metadataLatencyNs =
getMetrics().updatePutKeyMetadataStats(startNanos);
- putLength = IOUtils.copyLarge(digestInputStream, ozoneOutputStream);
+ putLength = IOUtils.copy(digestInputStream, ozoneOutputStream,
getIOBufferSize(length));
byte[] digest = digestInputStream.getMessageDigest().digest();
ozoneOutputStream.getMetadata()
.put(ETAG,
DatatypeConverter.printHexBinary(digest).toLowerCase());
@@ -1178,7 +1178,7 @@ public class ObjectEndpoint extends EndpointBase {
long metadataLatencyNs =
getMetrics().updateCopyKeyMetadataStats(startNanos);
perf.appendMetaLatencyNanos(metadataLatencyNs);
- copyLength = IOUtils.copyLarge(src, dest);
+ copyLength = IOUtils.copy(src, dest, getIOBufferSize(srcKeyLen));
String eTag =
DatatypeConverter.printHexBinary(src.getMessageDigest().digest()).toLowerCase();
dest.getMetadata().put(ETAG, eTag);
}
@@ -1408,4 +1408,18 @@ public class ObjectEndpoint extends EndpointBase {
}
return null;
}
+
+ private int getIOBufferSize(long fileLength) {
+ if (bufferSize == 0) {
+ // this is mainly for unit tests as init() will not be called in the
unit tests
+ LOG.warn("buffer size is set to {}", IOUtils.DEFAULT_BUFFER_SIZE);
+ bufferSize = IOUtils.DEFAULT_BUFFER_SIZE;
+ }
+ if (fileLength == 0) {
+ // for empty file
+ return bufferSize;
+ } else {
+ return fileLength < bufferSize ? (int) fileLength : bufferSize;
+ }
+ }
}
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
index 8cde144a37..8b3d9e1ad2 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
@@ -31,6 +31,7 @@ import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
+
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
@@ -79,6 +80,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
@@ -368,7 +370,7 @@ class TestObjectPut {
MessageDigest messageDigest = mock(MessageDigest.class);
try (MockedStatic<IOUtils> mocked = mockStatic(IOUtils.class)) {
// For example, EOFException during put-object due to client cancelling
the operation before it completes
- mocked.when(() -> IOUtils.copyLarge(any(InputStream.class),
any(OutputStream.class)))
+ mocked.when(() -> IOUtils.copy(any(InputStream.class),
any(OutputStream.class), anyInt()))
.thenThrow(IOException.class);
when(objectEndpoint.getMessageDigestInstance()).thenReturn(messageDigest);
@@ -553,7 +555,7 @@ class TestObjectPut {
try (MockedStatic<IOUtils> mocked = mockStatic(IOUtils.class)) {
// Add the mocked methods only during the copy request
when(objectEndpoint.getMessageDigestInstance()).thenReturn(messageDigest);
- mocked.when(() -> IOUtils.copyLarge(any(InputStream.class),
any(OutputStream.class)))
+ mocked.when(() -> IOUtils.copy(any(InputStream.class),
any(OutputStream.class), anyInt()))
.thenThrow(IOException.class);
// Add copy header, and then call put
@@ -731,4 +733,17 @@ class TestObjectPut {
assertEquals(S3ErrorTable.NO_OVERWRITE.getCode(), exception.getCode());
assertEquals(S3ErrorTable.NO_OVERWRITE.getHttpCode(),
exception.getHttpCode());
}
+
+ @Test
+ public void testPutEmptyObject() throws IOException, OS3Exception {
+ HttpHeaders headersWithTags = Mockito.mock(HttpHeaders.class);
+ String emptyString = "";
+ ByteArrayInputStream body = new
ByteArrayInputStream(emptyString.getBytes(UTF_8));
+ objectEndpoint.setHeaders(headersWithTags);
+
+ Response putResponse = objectEndpoint.put(BUCKET_NAME, KEY_NAME,
emptyString.length(), 1, null, body);
+ assertEquals(200, putResponse.getStatus());
+ OzoneKeyDetails keyDetails =
clientStub.getObjectStore().getS3Bucket(BUCKET_NAME).getKey(KEY_NAME);
+ assertEquals(0, keyDetails.getDataSize());
+ }
}
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
index aecc56fe17..fbff764829 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
@@ -51,6 +51,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;
@@ -234,7 +235,7 @@ public class TestPartUpload {
try (MockedStatic<IOUtils> mocked = mockStatic(IOUtils.class)) {
// Add the mocked methods only during the copy request
when(objectEndpoint.getMessageDigestInstance()).thenReturn(messageDigest);
- mocked.when(() -> IOUtils.copyLarge(any(InputStream.class),
any(OutputStream.class)))
+ mocked.when(() -> IOUtils.copy(any(InputStream.class),
any(OutputStream.class), anyInt()))
.thenThrow(IOException.class);
String content = "Multipart Upload";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]