This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new c1bcdeac54 HDDS-10832. Client should switch to streaming based on 
OpenKeySession replication (#6683)
c1bcdeac54 is described below

commit c1bcdeac54db392d70f8d7bab2994fd626d9d68d
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Wed May 22 10:43:18 2024 +0200

    HDDS-10832. Client should switch to streaming based on OpenKeySession 
replication (#6683)
---
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  | 90 ++++++++++++----------
 .../client/rpc/TestOzoneRpcClientWithRatis.java    | 19 +++--
 2 files changed, 59 insertions(+), 50 deletions(-)

diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 42b53e0d23..d0266c95a2 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hdds.scm.StreamBufferArgs;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CACertificateProvider;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -1959,11 +1960,16 @@ public class RpcClient implements ClientProtocol {
       long size, int partNumber, String uploadID) throws IOException {
     final OpenKeySession openKey = newMultipartOpenKey(
         volumeName, bucketName, keyName, size, partNumber, uploadID, false);
+    return createMultipartOutputStream(openKey, uploadID, partNumber);
+  }
+
+  private OzoneOutputStream createMultipartOutputStream(
+      OpenKeySession openKey, String uploadID, int partNumber
+  ) throws IOException {
     KeyOutputStream keyOutputStream = createKeyOutputStream(openKey)
         .setMultipartNumber(partNumber)
         .setMultipartUploadID(uploadID)
         .setIsMultipartKey(true)
-        .setAtomicKeyCreation(isS3GRequest.get())
         .build();
     return createOutputStream(openKey, keyOutputStream);
   }
@@ -1979,29 +1985,25 @@ public class RpcClient implements ClientProtocol {
       throws IOException {
     final OpenKeySession openKey = newMultipartOpenKey(
         volumeName, bucketName, keyName, size, partNumber, uploadID, true);
-    // Amazon S3 never adds partial objects, So for S3 requests we need to
-    // set atomicKeyCreation to true
-    // refer: 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
-    KeyDataStreamOutput keyOutputStream =
-        new KeyDataStreamOutput.Builder()
-            .setHandler(openKey)
-            .setXceiverClientManager(xceiverClientManager)
-            .setOmClient(ozoneManagerClient)
-            .setReplicationConfig(openKey.getKeyInfo().getReplicationConfig())
-            .setMultipartNumber(partNumber)
-            .setMultipartUploadID(uploadID)
-            .setIsMultipartKey(true)
-            .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
-            .setConfig(clientConfig)
-            .setAtomicKeyCreation(isS3GRequest.get())
-            .build();
-    keyOutputStream
-        .addPreallocateBlocks(
-            openKey.getKeyInfo().getLatestVersionLocations(),
-            openKey.getOpenVersion());
-    final OzoneOutputStream out = createSecureOutputStream(
-        openKey, keyOutputStream, null);
-    return new OzoneDataStreamOutput(out != null ? out : keyOutputStream);
+    final ByteBufferStreamOutput out;
+    ReplicationConfig replicationConfig = 
openKey.getKeyInfo().getReplicationConfig();
+    if (replicationConfig.getReplicationType() == 
HddsProtos.ReplicationType.RATIS) {
+      KeyDataStreamOutput keyOutputStream = newKeyOutputStreamBuilder()
+          .setHandler(openKey)
+          .setReplicationConfig(replicationConfig)
+          .setMultipartNumber(partNumber)
+          .setMultipartUploadID(uploadID)
+          .setIsMultipartKey(true)
+          .build();
+      keyOutputStream.addPreallocateBlocks(
+          openKey.getKeyInfo().getLatestVersionLocations(),
+          openKey.getOpenVersion());
+      final OzoneOutputStream secureOut = createSecureOutputStream(openKey, 
keyOutputStream, null);
+      out = secureOut != null ? secureOut : keyOutputStream;
+    } else {
+      out = createMultipartOutputStream(openKey, uploadID, partNumber);
+    }
+    return new OzoneDataStreamOutput(out);
   }
 
   @Override
@@ -2403,25 +2405,33 @@ public class RpcClient implements ClientProtocol {
       throws IOException {
     final ReplicationConfig replicationConfig
         = openKey.getKeyInfo().getReplicationConfig();
+    final ByteBufferStreamOutput out;
+    if (replicationConfig.getReplicationType() == 
HddsProtos.ReplicationType.RATIS) {
+      KeyDataStreamOutput keyOutputStream = newKeyOutputStreamBuilder()
+          .setHandler(openKey)
+          .setReplicationConfig(replicationConfig)
+          .build();
+      keyOutputStream.addPreallocateBlocks(
+          openKey.getKeyInfo().getLatestVersionLocations(),
+          openKey.getOpenVersion());
+      final OzoneOutputStream secureOut = createSecureOutputStream(openKey, 
keyOutputStream, null);
+      out = secureOut != null ? secureOut : keyOutputStream;
+    } else {
+      out = createOutputStream(openKey);
+    }
+    return new OzoneDataStreamOutput(out);
+  }
+
+  private KeyDataStreamOutput.Builder newKeyOutputStreamBuilder() {
     // Amazon S3 never adds partial objects, So for S3 requests we need to
     // set atomicKeyCreation to true
     // refer: 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
-    KeyDataStreamOutput keyOutputStream =
-        new KeyDataStreamOutput.Builder()
-            .setHandler(openKey)
-            .setXceiverClientManager(xceiverClientManager)
-            .setOmClient(ozoneManagerClient)
-            .setReplicationConfig(replicationConfig)
-            .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
-            .setConfig(clientConfig)
-            .setAtomicKeyCreation(isS3GRequest.get())
-            .build();
-    keyOutputStream
-        .addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(),
-            openKey.getOpenVersion());
-    final OzoneOutputStream out = createSecureOutputStream(
-        openKey, keyOutputStream, null);
-    return new OzoneDataStreamOutput(out != null ? out : keyOutputStream);
+    return new KeyDataStreamOutput.Builder()
+        .setXceiverClientManager(xceiverClientManager)
+        .setOmClient(ozoneManagerClient)
+        .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
+        .setConfig(clientConfig)
+        .setAtomicKeyCreation(isS3GRequest.get());
   }
 
   private OzoneOutputStream createOutputStream(OpenKeySession openKey)
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
index 4ecbd08a41..c4a452e168 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
@@ -27,6 +27,7 @@ import java.nio.channels.FileChannel;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.HashMap;
+import java.util.Locale;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
@@ -61,6 +62,8 @@ import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
@@ -170,12 +173,13 @@ public class TestOzoneRpcClientWithRatis extends 
TestOzoneRpcClientAbstract {
     }
   }
 
-  @Test
-  public void testMultiPartUploadWithStream()
+  @ParameterizedTest
+  @MethodSource("replicationConfigs")
+  void testMultiPartUploadWithStream(ReplicationConfig replicationConfig)
       throws IOException, NoSuchAlgorithmException {
     String volumeName = UUID.randomUUID().toString();
-    String bucketName = UUID.randomUUID().toString();
-    String keyName = UUID.randomUUID().toString();
+    String bucketName = 
replicationConfig.getReplicationType().name().toLowerCase(Locale.ROOT) + 
"-bucket";
+    String keyName = replicationConfig.getReplication();
 
     byte[] sampleData = new byte[1024 * 8];
 
@@ -186,11 +190,6 @@ public class TestOzoneRpcClientWithRatis extends 
TestOzoneRpcClientAbstract {
     volume.createBucket(bucketName);
     OzoneBucket bucket = volume.getBucket(bucketName);
 
-    ReplicationConfig replicationConfig =
-        ReplicationConfig.fromTypeAndFactor(
-            ReplicationType.RATIS,
-            THREE);
-
     OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
         replicationConfig);
 
@@ -210,7 +209,7 @@ public class TestOzoneRpcClientWithRatis extends 
TestOzoneRpcClientAbstract {
     OzoneMultipartUploadPartListParts parts =
         bucket.listParts(keyName, uploadID, 0, 1);
 
-    assertEquals(parts.getPartInfoList().size(), 1);
+    assertEquals(1, parts.getPartInfoList().size());
 
     OzoneMultipartUploadPartListParts.PartInfo partInfo =
         parts.getPartInfoList().get(0);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to