This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new c1bcdeac54 HDDS-10832. Client should switch to streaming based on
OpenKeySession replication (#6683)
c1bcdeac54 is described below
commit c1bcdeac54db392d70f8d7bab2994fd626d9d68d
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Wed May 22 10:43:18 2024 +0200
HDDS-10832. Client should switch to streaming based on OpenKeySession
replication (#6683)
---
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 90 ++++++++++++----------
.../client/rpc/TestOzoneRpcClientWithRatis.java | 19 +++--
2 files changed, 59 insertions(+), 50 deletions(-)
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 42b53e0d23..d0266c95a2 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hdds.scm.StreamBufferArgs;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
import
org.apache.hadoop.hdds.security.x509.certificate.client.CACertificateProvider;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -1959,11 +1960,16 @@ public class RpcClient implements ClientProtocol {
long size, int partNumber, String uploadID) throws IOException {
final OpenKeySession openKey = newMultipartOpenKey(
volumeName, bucketName, keyName, size, partNumber, uploadID, false);
+ return createMultipartOutputStream(openKey, uploadID, partNumber);
+ }
+
+ private OzoneOutputStream createMultipartOutputStream(
+ OpenKeySession openKey, String uploadID, int partNumber
+ ) throws IOException {
KeyOutputStream keyOutputStream = createKeyOutputStream(openKey)
.setMultipartNumber(partNumber)
.setMultipartUploadID(uploadID)
.setIsMultipartKey(true)
- .setAtomicKeyCreation(isS3GRequest.get())
.build();
return createOutputStream(openKey, keyOutputStream);
}
@@ -1979,29 +1985,25 @@ public class RpcClient implements ClientProtocol {
throws IOException {
final OpenKeySession openKey = newMultipartOpenKey(
volumeName, bucketName, keyName, size, partNumber, uploadID, true);
- // Amazon S3 never adds partial objects, So for S3 requests we need to
- // set atomicKeyCreation to true
- // refer:
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
- KeyDataStreamOutput keyOutputStream =
- new KeyDataStreamOutput.Builder()
- .setHandler(openKey)
- .setXceiverClientManager(xceiverClientManager)
- .setOmClient(ozoneManagerClient)
- .setReplicationConfig(openKey.getKeyInfo().getReplicationConfig())
- .setMultipartNumber(partNumber)
- .setMultipartUploadID(uploadID)
- .setIsMultipartKey(true)
- .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
- .setConfig(clientConfig)
- .setAtomicKeyCreation(isS3GRequest.get())
- .build();
- keyOutputStream
- .addPreallocateBlocks(
- openKey.getKeyInfo().getLatestVersionLocations(),
- openKey.getOpenVersion());
- final OzoneOutputStream out = createSecureOutputStream(
- openKey, keyOutputStream, null);
- return new OzoneDataStreamOutput(out != null ? out : keyOutputStream);
+ final ByteBufferStreamOutput out;
+ ReplicationConfig replicationConfig =
openKey.getKeyInfo().getReplicationConfig();
+ if (replicationConfig.getReplicationType() ==
HddsProtos.ReplicationType.RATIS) {
+ KeyDataStreamOutput keyOutputStream = newKeyOutputStreamBuilder()
+ .setHandler(openKey)
+ .setReplicationConfig(replicationConfig)
+ .setMultipartNumber(partNumber)
+ .setMultipartUploadID(uploadID)
+ .setIsMultipartKey(true)
+ .build();
+ keyOutputStream.addPreallocateBlocks(
+ openKey.getKeyInfo().getLatestVersionLocations(),
+ openKey.getOpenVersion());
+ final OzoneOutputStream secureOut = createSecureOutputStream(openKey,
keyOutputStream, null);
+ out = secureOut != null ? secureOut : keyOutputStream;
+ } else {
+ out = createMultipartOutputStream(openKey, uploadID, partNumber);
+ }
+ return new OzoneDataStreamOutput(out);
}
@Override
@@ -2403,25 +2405,33 @@ public class RpcClient implements ClientProtocol {
throws IOException {
final ReplicationConfig replicationConfig
= openKey.getKeyInfo().getReplicationConfig();
+ final ByteBufferStreamOutput out;
+ if (replicationConfig.getReplicationType() ==
HddsProtos.ReplicationType.RATIS) {
+ KeyDataStreamOutput keyOutputStream = newKeyOutputStreamBuilder()
+ .setHandler(openKey)
+ .setReplicationConfig(replicationConfig)
+ .build();
+ keyOutputStream.addPreallocateBlocks(
+ openKey.getKeyInfo().getLatestVersionLocations(),
+ openKey.getOpenVersion());
+ final OzoneOutputStream secureOut = createSecureOutputStream(openKey,
keyOutputStream, null);
+ out = secureOut != null ? secureOut : keyOutputStream;
+ } else {
+ out = createOutputStream(openKey);
+ }
+ return new OzoneDataStreamOutput(out);
+ }
+
+ private KeyDataStreamOutput.Builder newKeyOutputStreamBuilder() {
// Amazon S3 never adds partial objects, So for S3 requests we need to
// set atomicKeyCreation to true
// refer:
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
- KeyDataStreamOutput keyOutputStream =
- new KeyDataStreamOutput.Builder()
- .setHandler(openKey)
- .setXceiverClientManager(xceiverClientManager)
- .setOmClient(ozoneManagerClient)
- .setReplicationConfig(replicationConfig)
- .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
- .setConfig(clientConfig)
- .setAtomicKeyCreation(isS3GRequest.get())
- .build();
- keyOutputStream
- .addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(),
- openKey.getOpenVersion());
- final OzoneOutputStream out = createSecureOutputStream(
- openKey, keyOutputStream, null);
- return new OzoneDataStreamOutput(out != null ? out : keyOutputStream);
+ return new KeyDataStreamOutput.Builder()
+ .setXceiverClientManager(xceiverClientManager)
+ .setOmClient(ozoneManagerClient)
+ .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
+ .setConfig(clientConfig)
+ .setAtomicKeyCreation(isS3GRequest.get());
}
private OzoneOutputStream createOutputStream(OpenKeySession openKey)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
index 4ecbd08a41..c4a452e168 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
@@ -27,6 +27,7 @@ import java.nio.channels.FileChannel;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
+import java.util.Locale;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
@@ -61,6 +62,8 @@ import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
@@ -170,12 +173,13 @@ public class TestOzoneRpcClientWithRatis extends
TestOzoneRpcClientAbstract {
}
}
- @Test
- public void testMultiPartUploadWithStream()
+ @ParameterizedTest
+ @MethodSource("replicationConfigs")
+ void testMultiPartUploadWithStream(ReplicationConfig replicationConfig)
throws IOException, NoSuchAlgorithmException {
String volumeName = UUID.randomUUID().toString();
- String bucketName = UUID.randomUUID().toString();
- String keyName = UUID.randomUUID().toString();
+ String bucketName =
replicationConfig.getReplicationType().name().toLowerCase(Locale.ROOT) +
"-bucket";
+ String keyName = replicationConfig.getReplication();
byte[] sampleData = new byte[1024 * 8];
@@ -186,11 +190,6 @@ public class TestOzoneRpcClientWithRatis extends
TestOzoneRpcClientAbstract {
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
- ReplicationConfig replicationConfig =
- ReplicationConfig.fromTypeAndFactor(
- ReplicationType.RATIS,
- THREE);
-
OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
replicationConfig);
@@ -210,7 +209,7 @@ public class TestOzoneRpcClientWithRatis extends
TestOzoneRpcClientAbstract {
OzoneMultipartUploadPartListParts parts =
bucket.listParts(keyName, uploadID, 0, 1);
- assertEquals(parts.getPartInfoList().size(), 1);
+ assertEquals(1, parts.getPartInfoList().size());
OzoneMultipartUploadPartListParts.PartInfo partInfo =
parts.getPartInfoList().get(0);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]