This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new c55f30e  HDDS-6412: EC: Handle ECReplicationConfig in 
initiateMultipartUpload API (#3184)
c55f30e is described below

commit c55f30edf0fd36746c950761dbc3ff76f6d1c785
Author: Uma Maheswara Rao G <[email protected]>
AuthorDate: Mon Mar 28 13:55:49 2022 -0700

    HDDS-6412: EC: Handle ECReplicationConfig in initiateMultipartUpload API 
(#3184)
---
 ...OzoneManagerProtocolClientSideTranslatorPB.java | 10 +++--
 .../org/apache/hadoop/ozone/om/TestOmMetrics.java  | 48 ++++++++++++++--------
 .../hadoop/ozone/s3/endpoint/ObjectEndpoint.java   | 48 ++++++++++------------
 .../hadoop/ozone/client/OzoneBucketStub.java       |  8 ++++
 .../s3/endpoint/TestInitiateMultipartUpload.java   | 35 ++++++++++++++--
 .../ozone/s3/endpoint/TestPermissionCheck.java     |  3 +-
 6 files changed, 99 insertions(+), 53 deletions(-)

diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 6f381d0..b1ef8fe 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -963,9 +963,13 @@ public final class 
OzoneManagerProtocolClientSideTranslatorPB
             OzoneAcl.toProtobuf(a)).collect(Collectors.toList()));
 
     if (omKeyArgs.getReplicationConfig() != null) {
-      keyArgs.setFactor(
-              ReplicationConfig
-                      .getLegacyFactor(omKeyArgs.getReplicationConfig()));
+      if (omKeyArgs.getReplicationConfig() instanceof ECReplicationConfig) {
+        keyArgs.setEcReplicationConfig(
+            ((ECReplicationConfig) 
omKeyArgs.getReplicationConfig()).toProto());
+      } else {
+        keyArgs.setFactor(ReplicationConfig
+            .getLegacyFactor(omKeyArgs.getReplicationConfig()));
+      }
       keyArgs.setType(omKeyArgs.getReplicationConfig().getReplicationType());
     }
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
index 7ff190a..c9babb8 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
@@ -34,7 +34,9 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.ContainerBlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.HddsWhiteboxTestUtils;
 import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
@@ -253,7 +255,7 @@ public class TestOmMetrics {
   @Test
   public void testKeyOps() throws Exception {
     // This test needs a cluster with DNs and SCM to wait on safemode
-    clusterBuilder.setNumDatanodes(3);
+    clusterBuilder.setNumDatanodes(5);
     conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED, true);
     startCluster();
     String volumeName = UUID.randomUUID().toString();
@@ -262,7 +264,8 @@ public class TestOmMetrics {
         .getInternalState(ozoneManager, "keyManager");
     KeyManager mockKm = Mockito.spy(keyManager);
     TestDataUtil.createVolumeAndBucket(cluster, volumeName, bucketName);
-    OmKeyArgs keyArgs = createKeyArgs(volumeName, bucketName);
+    OmKeyArgs keyArgs = createKeyArgs(volumeName, bucketName,
+        
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
     doKeyOps(keyArgs);
 
     MetricsRecordBuilder omMetrics = getMetrics("OMMetrics");
@@ -275,18 +278,26 @@ public class TestOmMetrics {
     assertCounter("NumKeys", 0L, omMetrics);
     assertCounter("NumInitiateMultipartUploads", 1L, omMetrics);
 
-    keyArgs = createKeyArgs(volumeName, bucketName);
+    keyArgs = createKeyArgs(volumeName, bucketName,
+        new ECReplicationConfig("rs-3-2-1024K"));
+    doKeyOps(keyArgs);
+    omMetrics = getMetrics("OMMetrics");
+    assertCounter("NumKeyOps", 14L, omMetrics);
+
+    keyArgs = createKeyArgs(volumeName, bucketName,
+        
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
     OpenKeySession keySession = writeClient.openKey(keyArgs);
     writeClient.commitKey(keyArgs, keySession.getId());
-    keyArgs = createKeyArgs(volumeName, bucketName);
+    keyArgs = createKeyArgs(volumeName, bucketName,
+        
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
     keySession = writeClient.openKey(keyArgs);
     writeClient.commitKey(keyArgs, keySession.getId());
-    keyArgs = createKeyArgs(volumeName, bucketName);
+    keyArgs = createKeyArgs(volumeName, bucketName,
+        
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
     keySession = writeClient.openKey(keyArgs);
     writeClient.commitKey(keyArgs, keySession.getId());
     writeClient.deleteKey(keyArgs);
 
-
     omMetrics = getMetrics("OMMetrics");
     assertCounter("NumKeys", 2L, omMetrics);
 
@@ -301,17 +312,18 @@ public class TestOmMetrics {
 
     // inject exception to test for Failure Metrics on the write path
     mockWritePathExceptions(OmBucketInfo.class);
-    keyArgs = createKeyArgs(volumeName, bucketName);
+    keyArgs = createKeyArgs(volumeName, bucketName,
+        
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
     doKeyOps(keyArgs);
 
     omMetrics = getMetrics("OMMetrics");
-    assertCounter("NumKeyOps", 21L, omMetrics);
-    assertCounter("NumKeyAllocate", 5L, omMetrics);
-    assertCounter("NumKeyLookup", 2L, omMetrics);
-    assertCounter("NumKeyDeletes", 3L, omMetrics);
-    assertCounter("NumKeyLists", 2L, omMetrics);
-    assertCounter("NumTrashKeyLists", 2L, omMetrics);
-    assertCounter("NumInitiateMultipartUploads", 2L, omMetrics);
+    assertCounter("NumKeyOps", 28L, omMetrics);
+    assertCounter("NumKeyAllocate", 6L, omMetrics);
+    assertCounter("NumKeyLookup", 3L, omMetrics);
+    assertCounter("NumKeyDeletes", 4L, omMetrics);
+    assertCounter("NumKeyLists", 3L, omMetrics);
+    assertCounter("NumTrashKeyLists", 3L, omMetrics);
+    assertCounter("NumInitiateMultipartUploads", 3L, omMetrics);
 
     assertCounter("NumKeyAllocateFails", 1L, omMetrics);
     assertCounter("NumKeyLookupFails", 1L, omMetrics);
@@ -556,8 +568,8 @@ public class TestOmMetrics {
     }
   }
 
-  private OmKeyArgs createKeyArgs(String volumeName, String bucketName)
-      throws IOException {
+  private OmKeyArgs createKeyArgs(String volumeName, String bucketName,
+      ReplicationConfig repConfig) throws IOException {
     OmKeyLocationInfo keyLocationInfo = new OmKeyLocationInfo.Builder()
         .setBlockID(new BlockID(new ContainerBlockID(1, 1)))
         .setPipeline(MockPipeline.createSingleNodePipeline())
@@ -571,10 +583,10 @@ public class TestOmMetrics {
         .setBucketName(bucketName)
         .setKeyName(keyName)
         .setAcls(Lists.emptyList())
-        .setReplicationConfig(RatisReplicationConfig.getInstance(
-            HddsProtos.ReplicationFactor.THREE))
+        .setReplicationConfig(repConfig)
         .build();
   }
+
   private OmVolumeArgs createVolumeArgs() {
     String volumeName = UUID.randomUUID().toString();
     return new OmVolumeArgs.Builder()
diff --git 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 99a08d6..d048550 100644
--- 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -53,8 +53,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.OptionalLong;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.StorageUnit;
@@ -178,24 +178,12 @@ public class ObjectEndpoint extends EndpointBase {
     try {
       copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER);
       storageType = headers.getHeaderString(STORAGE_CLASS_HEADER);
+      boolean storageTypeDefault = StringUtils.isEmpty(storageType);
 
       // Normal put object
       OzoneBucket bucket = getBucket(bucketName);
-      ReplicationConfig clientConfiguredReplicationConfig = null;
-      String replication = ozoneConfiguration.get(OZONE_REPLICATION);
-      if (replication != null) {
-        clientConfiguredReplicationConfig = ReplicationConfig.parse(
-            ReplicationType.valueOf(ozoneConfiguration
-                .get(OZONE_REPLICATION_TYPE, OZONE_REPLICATION_TYPE_DEFAULT)),
-            replication, ozoneConfiguration);
-      }
-      ReplicationConfig replicationConfig = S3Utils
-          .resolveS3ClientSideReplicationConfig(storageType,
-              clientConfiguredReplicationConfig, 
bucket.getReplicationConfig());
-      boolean storageTypeDefault = false;
-      if (storageType == null || storageType.equals("")) {
-        storageTypeDefault = true;
-      }
+      ReplicationConfig replicationConfig =
+          getReplicationConfig(bucket, storageType);
 
       if (copyHeader != null) {
         //Copy object, as copy source available.
@@ -498,17 +486,11 @@ public class ObjectEndpoint extends EndpointBase {
       OzoneBucket ozoneBucket = getBucket(bucket);
       String storageType = headers.getHeaderString(STORAGE_CLASS_HEADER);
 
-      S3StorageType s3StorageType;
-      if (storageType == null || storageType.equals("")) {
-        s3StorageType = S3StorageType.getDefault(ozoneConfiguration);
-      } else {
-        s3StorageType = S3Utils.toS3StorageType(storageType);
-      }
-      ReplicationType replicationType = s3StorageType.getType();
-      ReplicationFactor replicationFactor = s3StorageType.getFactor();
+      ReplicationConfig replicationConfig =
+          getReplicationConfig(ozoneBucket, storageType);
 
-      OmMultipartInfo multipartInfo = ozoneBucket
-          .initiateMultipartUpload(key, replicationType, replicationFactor);
+      OmMultipartInfo multipartInfo =
+          ozoneBucket.initiateMultipartUpload(key, replicationConfig);
 
       MultipartUploadInitiateResponse multipartUploadInitiateResponse = new
           MultipartUploadInitiateResponse();
@@ -531,6 +513,20 @@ public class ObjectEndpoint extends EndpointBase {
     }
   }
 
+  private ReplicationConfig getReplicationConfig(OzoneBucket ozoneBucket,
+      String storageType) throws OS3Exception {
+    ReplicationConfig clientConfiguredReplicationConfig = null;
+    String replication = ozoneConfiguration.get(OZONE_REPLICATION);
+    if (replication != null) {
+      clientConfiguredReplicationConfig = ReplicationConfig.parse(
+          ReplicationType.valueOf(ozoneConfiguration
+              .get(OZONE_REPLICATION_TYPE, OZONE_REPLICATION_TYPE_DEFAULT)),
+          replication, ozoneConfiguration);
+    }
+    return S3Utils.resolveS3ClientSideReplicationConfig(storageType,
+        clientConfiguredReplicationConfig, ozoneBucket.getReplicationConfig());
+  }
+
   /**
    * Complete a multipart upload.
    */
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
index fcddfde..f4b5846 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
@@ -228,6 +228,14 @@ public class OzoneBucketStub extends OzoneBucket {
   }
 
   @Override
+  public OmMultipartInfo initiateMultipartUpload(String keyName,
+      ReplicationConfig repConfig) throws IOException {
+    String uploadID = UUID.randomUUID().toString();
+    multipartUploadIdMap.put(keyName, uploadID);
+    return new OmMultipartInfo(getVolumeName(), getName(), keyName, uploadID);
+  }
+
+  @Override
   public OzoneOutputStream createMultipartKey(String key, long size,
                                               int partNumber, String uploadID)
       throws IOException {
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestInitiateMultipartUpload.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestInitiateMultipartUpload.java
index 2fa396f..8bd0b84d 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestInitiateMultipartUpload.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestInitiateMultipartUpload.java
@@ -20,10 +20,12 @@
 
 package org.apache.hadoop.ozone.s3.endpoint;
 
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientStub;
+import org.jetbrains.annotations.NotNull;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -53,10 +55,7 @@ public class TestInitiateMultipartUpload {
     when(headers.getHeaderString(STORAGE_CLASS_HEADER)).thenReturn(
         "STANDARD");
 
-    ObjectEndpoint rest = new ObjectEndpoint();
-    rest.setHeaders(headers);
-    rest.setClient(client);
-    rest.setOzoneConfiguration(new OzoneConfiguration());
+    ObjectEndpoint rest = getObjectEndpoint(client, headers);
 
     Response response = rest.initializeMultipartUpload(bucket, key);
 
@@ -74,4 +73,32 @@ public class TestInitiateMultipartUpload {
     assertNotNull(multipartUploadInitiateResponse.getUploadID());
     assertNotEquals(multipartUploadInitiateResponse.getUploadID(), uploadID);
   }
+
+  @Test
+  public void testInitiateMultipartUploadWithECKey() throws Exception {
+    String bucket = OzoneConsts.S3_BUCKET;
+    String key = OzoneConsts.KEY;
+    OzoneClient client = new OzoneClientStub();
+    client.getObjectStore().createS3Bucket(bucket);
+    HttpHeaders headers = Mockito.mock(HttpHeaders.class);
+    ObjectEndpoint rest = getObjectEndpoint(client, headers);
+    client.getObjectStore().getS3Bucket(bucket)
+        .setReplicationConfig(new ECReplicationConfig("rs-3-2-1024K"));
+    Response response = rest.initializeMultipartUpload(bucket, key);
+
+    assertEquals(200, response.getStatus());
+    MultipartUploadInitiateResponse multipartUploadInitiateResponse =
+        (MultipartUploadInitiateResponse) response.getEntity();
+    assertNotNull(multipartUploadInitiateResponse.getUploadID());
+  }
+
+  @NotNull
+  private ObjectEndpoint getObjectEndpoint(OzoneClient client,
+      HttpHeaders headers) {
+    ObjectEndpoint rest = new ObjectEndpoint();
+    rest.setHeaders(headers);
+    rest.setClient(client);
+    rest.setOzoneConfiguration(new OzoneConfiguration());
+    return rest;
+  }
 }
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPermissionCheck.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPermissionCheck.java
index 4aa5ef1..3f0f21b 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPermissionCheck.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPermissionCheck.java
@@ -304,8 +304,7 @@ public class TestPermissionCheck {
   @Test
   public void testMultiUploadKey() throws IOException {
     Mockito.when(objectStore.getS3Bucket(anyString())).thenReturn(bucket);
-    doThrow(exception).when(bucket)
-        .initiateMultipartUpload(anyString(), any(), any());
+    doThrow(exception).when(bucket).initiateMultipartUpload(anyString(), 
any());
     ObjectEndpoint objectEndpoint = new ObjectEndpoint();
     objectEndpoint.setClient(client);
     objectEndpoint.setHeaders(headers);

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to